This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new d5a6cf4e5a Fix ObjectStore.LocalFileSystem.put_opts for blobfuse
(#5094)
d5a6cf4e5a is described below
commit d5a6cf4e5aaf9a4c5f2777c81aea9ef315578b2d
Author: Robin Lin <[email protected]>
AuthorDate: Mon Nov 27 19:07:45 2023 +0800
Fix ObjectStore.LocalFileSystem.put_opts for blobfuse (#5094)
* Fix ObjectStore.LocalFileSystem.put_opts for blobfuse
* Fix ObjectStore.LocalFileSystem.put_opts for blobfuse
* fix comment
* fix race condition
* add comment
---
object_store/src/local.rs | 56 +++++++++++++++++++++++++++--------------------
1 file changed, 32 insertions(+), 24 deletions(-)
diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index dd71d9ec12..71b96f058c 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -338,28 +338,41 @@ impl ObjectStore for LocalFileSystem {
maybe_spawn_blocking(move || {
let (mut file, suffix) = new_staged_upload(&path)?;
let staging_path = staged_upload_path(&path, &suffix);
+ let mut e_tag = None;
let err = match file.write_all(&bytes) {
- Ok(_) => match opts.mode {
- PutMode::Overwrite => match std::fs::rename(&staging_path,
&path) {
- Ok(_) => None,
- Err(source) => Some(Error::UnableToRenameFile { source
}),
- },
- PutMode::Create => match std::fs::hard_link(&staging_path,
&path) {
- Ok(_) => {
- let _ = std::fs::remove_file(&staging_path); //
Attempt to cleanup
- None
+ Ok(_) => {
+ let metadata = file.metadata().map_err(|e| Error::Metadata
{
+ source: e.into(),
+ path: path.to_string_lossy().to_string(),
+ })?;
+ e_tag = Some(get_etag(&metadata));
+ match opts.mode {
+ PutMode::Overwrite => {
+ // For some fuse types of file systems, the file
must be closed first
+ // to trigger the upload operation, and then
renamed, such as Blobfuse
+ std::mem::drop(file);
+ match std::fs::rename(&staging_path, &path) {
+ Ok(_) => None,
+ Err(source) => Some(Error::UnableToRenameFile
{ source }),
+ }
}
- Err(source) => match source.kind() {
- ErrorKind::AlreadyExists =>
Some(Error::AlreadyExists {
- path: path.to_str().unwrap().to_string(),
- source,
- }),
- _ => Some(Error::UnableToRenameFile { source }),
+ PutMode::Create => match
std::fs::hard_link(&staging_path, &path) {
+ Ok(_) => {
+ let _ = std::fs::remove_file(&staging_path);
// Attempt to cleanup
+ None
+ }
+ Err(source) => match source.kind() {
+ ErrorKind::AlreadyExists =>
Some(Error::AlreadyExists {
+ path: path.to_str().unwrap().to_string(),
+ source,
+ }),
+ _ => Some(Error::UnableToRenameFile { source
}),
+ },
},
- },
- PutMode::Update(_) => unreachable!(),
- },
+ PutMode::Update(_) => unreachable!(),
+ }
+ }
Err(source) => Some(Error::UnableToCopyDataToFile { source }),
};
@@ -368,13 +381,8 @@ impl ObjectStore for LocalFileSystem {
return Err(err.into());
}
- let metadata = file.metadata().map_err(|e| Error::Metadata {
- source: e.into(),
- path: path.to_string_lossy().to_string(),
- })?;
-
Ok(PutResult {
- e_tag: Some(get_etag(&metadata)),
+ e_tag,
version: None,
})
})