This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new d5a6cf4e5a Fix ObjectStore.LocalFileSystem.put_opts for blobfuse 
(#5094)
d5a6cf4e5a is described below

commit d5a6cf4e5aaf9a4c5f2777c81aea9ef315578b2d
Author: Robin Lin <[email protected]>
AuthorDate: Mon Nov 27 19:07:45 2023 +0800

    Fix ObjectStore.LocalFileSystem.put_opts for blobfuse (#5094)
    
    * Fix ObjectStore.LocalFileSystem.put_opts for blobfuse
    
    * Fix ObjectStore.LocalFileSystem.put_opts for blobfuse
    
    * fix comment
    
    * fix race condition
    
    * add comment
---
 object_store/src/local.rs | 56 +++++++++++++++++++++++++++--------------------
 1 file changed, 32 insertions(+), 24 deletions(-)

diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index dd71d9ec12..71b96f058c 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -338,28 +338,41 @@ impl ObjectStore for LocalFileSystem {
         maybe_spawn_blocking(move || {
             let (mut file, suffix) = new_staged_upload(&path)?;
             let staging_path = staged_upload_path(&path, &suffix);
+            let mut e_tag = None;
 
             let err = match file.write_all(&bytes) {
-                Ok(_) => match opts.mode {
-                    PutMode::Overwrite => match std::fs::rename(&staging_path, 
&path) {
-                        Ok(_) => None,
-                        Err(source) => Some(Error::UnableToRenameFile { source 
}),
-                    },
-                    PutMode::Create => match std::fs::hard_link(&staging_path, 
&path) {
-                        Ok(_) => {
-                            let _ = std::fs::remove_file(&staging_path); // 
Attempt to cleanup
-                            None
+                Ok(_) => {
+                    let metadata = file.metadata().map_err(|e| Error::Metadata 
{
+                        source: e.into(),
+                        path: path.to_string_lossy().to_string(),
+                    })?;
+                    e_tag = Some(get_etag(&metadata));
+                    match opts.mode {
+                        PutMode::Overwrite => {
+                            // For some fuse types of file systems, the file 
must be closed first
+                            // to trigger the upload operation, and then 
renamed, such as Blobfuse
+                            std::mem::drop(file);
+                            match std::fs::rename(&staging_path, &path) {
+                                Ok(_) => None,
+                                Err(source) => Some(Error::UnableToRenameFile 
{ source }),
+                            }
                         }
-                        Err(source) => match source.kind() {
-                            ErrorKind::AlreadyExists => 
Some(Error::AlreadyExists {
-                                path: path.to_str().unwrap().to_string(),
-                                source,
-                            }),
-                            _ => Some(Error::UnableToRenameFile { source }),
+                        PutMode::Create => match 
std::fs::hard_link(&staging_path, &path) {
+                            Ok(_) => {
+                                let _ = std::fs::remove_file(&staging_path); 
// Attempt to cleanup
+                                None
+                            }
+                            Err(source) => match source.kind() {
+                                ErrorKind::AlreadyExists => 
Some(Error::AlreadyExists {
+                                    path: path.to_str().unwrap().to_string(),
+                                    source,
+                                }),
+                                _ => Some(Error::UnableToRenameFile { source 
}),
+                            },
                         },
-                    },
-                    PutMode::Update(_) => unreachable!(),
-                },
+                        PutMode::Update(_) => unreachable!(),
+                    }
+                }
                 Err(source) => Some(Error::UnableToCopyDataToFile { source }),
             };
 
@@ -368,13 +381,8 @@ impl ObjectStore for LocalFileSystem {
                 return Err(err.into());
             }
 
-            let metadata = file.metadata().map_err(|e| Error::Metadata {
-                source: e.into(),
-                path: path.to_string_lossy().to_string(),
-            })?;
-
             Ok(PutResult {
-                e_tag: Some(get_etag(&metadata)),
+                e_tag,
                 version: None,
             })
         })

Reply via email to