This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e46f8f08a Best effort cleanup of staged upload files (#4778) (#4792)
1e46f8f08a is described below

commit 1e46f8f08a9aa735fe581602aff66f5cc0c40b05
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Fri Sep 8 08:43:38 2023 +0100

    Best effort cleanup of staged upload files (#4778) (#4792)
    
    * Best effort cleanup of staged upload files (#4778)
    
    * Clippy
    
    * Fix MSRV
---
 object_store/src/local.rs | 142 ++++++++++++++++++++++++++--------------------
 1 file changed, 81 insertions(+), 61 deletions(-)

diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index 495bb4f9c4..20eb3c63cc 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -26,6 +26,7 @@ use async_trait::async_trait;
 use bytes::Bytes;
 use chrono::{DateTime, Utc};
 use futures::future::BoxFuture;
+use futures::ready;
 use futures::{stream::BoxStream, StreamExt};
 use futures::{FutureExt, TryStreamExt};
 use snafu::{ensure, ResultExt, Snafu};
@@ -274,13 +275,15 @@ impl ObjectStore for LocalFileSystem {
         maybe_spawn_blocking(move || {
             let (mut file, suffix) = new_staged_upload(&path)?;
             let staging_path = staged_upload_path(&path, &suffix);
-
             file.write_all(&bytes)
-                .context(UnableToCopyDataToFileSnafu)?;
-
-            std::fs::rename(staging_path, 
path).context(UnableToRenameFileSnafu)?;
-
-            Ok(())
+                .context(UnableToCopyDataToFileSnafu)
+                .and_then(|_| {
+                    std::fs::rename(&staging_path, 
&path).context(UnableToRenameFileSnafu)
+                })
+                .map_err(|e| {
+                    let _ = std::fs::remove_file(&staging_path); // Attempt to 
cleanup
+                    e.into()
+                })
         })
         .await
     }
@@ -304,12 +307,14 @@ impl ObjectStore for LocalFileSystem {
         multipart_id: &MultipartId,
     ) -> Result<()> {
         let dest = self.config.path_to_filesystem(location)?;
-        let staging_path: PathBuf = staged_upload_path(&dest, multipart_id);
+        let path: PathBuf = staged_upload_path(&dest, multipart_id);
 
-        maybe_spawn_blocking(move || {
-            std::fs::remove_file(&staging_path)
-                .context(UnableToDeleteFileSnafu { path: staging_path })?;
-            Ok(())
+        maybe_spawn_blocking(move || match std::fs::remove_file(&path) {
+            Ok(_) => Ok(()),
+            Err(source) => match source.kind() {
+                ErrorKind::NotFound => Ok(()), // Already deleted
+                _ => Err(Error::UnableToDeleteFile { path, source }.into()),
+            },
         })
         .await
     }
@@ -318,7 +323,6 @@ impl ObjectStore for LocalFileSystem {
         &self,
         location: &Path,
     ) -> Result<Box<dyn AsyncWrite + Unpin + Send>> {
-        #[cfg(not(target_arch = "wasm32"))]
         // Get the path to the file from the configuration.
         let path = self.config.path_to_filesystem(location)?;
         loop {
@@ -358,8 +362,6 @@ impl ObjectStore for LocalFileSystem {
                 }
             }
         }
-        #[cfg(target_arch = "wasm32")]
-        Err(super::Error::NotImplemented)
     }
 
     async fn get_opts(&self, location: &Path, options: GetOptions) -> 
Result<GetResult> {
@@ -597,8 +599,9 @@ impl ObjectStore for LocalFileSystem {
             match std::fs::hard_link(&from, &staged) {
                 Ok(_) => {
                     return std::fs::rename(&staged, &to).map_err(|source| {
+                        let _ = std::fs::remove_file(&staged); // Attempt to 
clean up
                         Error::UnableToCopyFile { from, to, source }.into()
-                    })
+                    });
                 }
                 Err(source) => match source.kind() {
                     ErrorKind::AlreadyExists => id += 1,
@@ -690,12 +693,9 @@ fn staged_upload_path(dest: &std::path::Path, suffix: 
&str) -> PathBuf {
 
 enum LocalUploadState {
     /// Upload is ready to send new data
-    Idle(Arc<std::fs::File>),
+    Idle(Arc<File>),
     /// In the middle of a write
-    Writing(
-        Arc<std::fs::File>,
-        BoxFuture<'static, Result<usize, io::Error>>,
-    ),
+    Writing(Arc<File>, BoxFuture<'static, Result<usize, io::Error>>),
     /// In the middle of syncing data and closing file.
     ///
     /// Future will contain last reference to file, so it will call drop on 
completion.
@@ -713,11 +713,7 @@ struct LocalUpload {
 }
 
 impl LocalUpload {
-    pub fn new(
-        dest: PathBuf,
-        multipart_id: MultipartId,
-        file: Arc<std::fs::File>,
-    ) -> Self {
+    pub fn new(dest: PathBuf, multipart_id: MultipartId, file: Arc<File>) -> 
Self {
         Self {
             inner_state: LocalUploadState::Idle(file),
             dest,
@@ -731,14 +727,13 @@ impl AsyncWrite for LocalUpload {
         mut self: Pin<&mut Self>,
         cx: &mut std::task::Context<'_>,
         buf: &[u8],
-    ) -> std::task::Poll<Result<usize, io::Error>> {
-        let invalid_state =
-            |condition: &str| -> std::task::Poll<Result<usize, io::Error>> {
-                Poll::Ready(Err(io::Error::new(
-                    io::ErrorKind::InvalidInput,
-                    format!("Tried to write to file {condition}."),
-                )))
-            };
+    ) -> Poll<Result<usize, io::Error>> {
+        let invalid_state = |condition: &str| -> Poll<Result<usize, 
io::Error>> {
+            Poll::Ready(Err(io::Error::new(
+                ErrorKind::InvalidInput,
+                format!("Tried to write to file {condition}."),
+            )))
+        };
 
         if let Ok(runtime) = tokio::runtime::Handle::try_current() {
             let mut data: Vec<u8> = buf.to_vec();
@@ -757,7 +752,7 @@ impl AsyncWrite for LocalUpload {
                                     .spawn_blocking(move || 
(&*file2).write_all(&data))
                                     .map(move |res| match res {
                                         Err(err) => {
-                                            
Err(io::Error::new(io::ErrorKind::Other, err))
+                                            
Err(io::Error::new(ErrorKind::Other, err))
                                         }
                                         Ok(res) => res.map(move |_| data_len),
                                     }),
@@ -765,16 +760,9 @@ impl AsyncWrite for LocalUpload {
                         );
                     }
                     LocalUploadState::Writing(file, inner_write) => {
-                        match inner_write.poll_unpin(cx) {
-                            Poll::Ready(res) => {
-                                self.inner_state =
-                                    LocalUploadState::Idle(Arc::clone(file));
-                                return Poll::Ready(res);
-                            }
-                            Poll::Pending => {
-                                return Poll::Pending;
-                            }
-                        }
+                        let res = ready!(inner_write.poll_unpin(cx));
+                        self.inner_state = 
LocalUploadState::Idle(Arc::clone(file));
+                        return Poll::Ready(res);
                     }
                     LocalUploadState::ShuttingDown(_) => {
                         return invalid_state("when writer is shutting down");
@@ -800,14 +788,14 @@ impl AsyncWrite for LocalUpload {
     fn poll_flush(
         self: Pin<&mut Self>,
         _cx: &mut std::task::Context<'_>,
-    ) -> std::task::Poll<Result<(), io::Error>> {
+    ) -> Poll<Result<(), io::Error>> {
         Poll::Ready(Ok(()))
     }
 
     fn poll_shutdown(
         mut self: Pin<&mut Self>,
         cx: &mut std::task::Context<'_>,
-    ) -> std::task::Poll<Result<(), io::Error>> {
+    ) -> Poll<Result<(), io::Error>> {
         if let Ok(runtime) = tokio::runtime::Handle::try_current() {
             loop {
                 match &mut self.inner_state {
@@ -854,13 +842,11 @@ impl AsyncWrite for LocalUpload {
                             "Tried to commit a file where a write is in 
progress.",
                         )));
                     }
-                    LocalUploadState::Committing(fut) => match 
fut.poll_unpin(cx) {
-                        Poll::Ready(res) => {
-                            self.inner_state = LocalUploadState::Complete;
-                            return Poll::Ready(res);
-                        }
-                        Poll::Pending => return Poll::Pending,
-                    },
+                    LocalUploadState::Committing(fut) => {
+                        let res = ready!(fut.poll_unpin(cx));
+                        self.inner_state = LocalUploadState::Complete;
+                        return Poll::Ready(res);
+                    }
                     LocalUploadState::Complete => {
                         return Poll::Ready(Err(io::Error::new(
                             io::ErrorKind::Other,
@@ -876,22 +862,36 @@ impl AsyncWrite for LocalUpload {
                     let file = Arc::clone(file);
                     self.inner_state = LocalUploadState::Complete;
                     file.sync_all()?;
-                    std::mem::drop(file);
+                    drop(file);
                     std::fs::rename(staging_path, &self.dest)?;
                     Poll::Ready(Ok(()))
                 }
                 _ => {
                     // If we are running on this thread, then only possible 
states are Idle and Complete.
-                    Poll::Ready(Err(io::Error::new(
-                        io::ErrorKind::Other,
-                        "Already complete",
-                    )))
+                    Poll::Ready(Err(io::Error::new(ErrorKind::Other, "Already 
complete")))
                 }
             }
         }
     }
 }
 
+impl Drop for LocalUpload {
+    fn drop(&mut self) {
+        match self.inner_state {
+            LocalUploadState::Complete => (),
+            _ => {
+                self.inner_state = LocalUploadState::Complete;
+                let path = staged_upload_path(&self.dest, &self.multipart_id);
+                // Try to cleanup intermediate file ignoring any error
+                match tokio::runtime::Handle::try_current() {
+                    Ok(r) => drop(r.spawn_blocking(move || 
std::fs::remove_file(path))),
+                    Err(_) => drop(std::fs::remove_file(path)),
+                };
+            }
+        }
+    }
+}
+
 pub(crate) fn chunked_stream(
     mut file: File,
     path: PathBuf,
@@ -1018,8 +1018,8 @@ fn convert_metadata(metadata: Metadata, location: Path) 
-> Result<ObjectMeta> {
 /// Convert walkdir results and converts not-found errors into `None`.
 /// Convert broken symlinks to `None`.
 fn convert_walkdir_result(
-    res: std::result::Result<walkdir::DirEntry, walkdir::Error>,
-) -> Result<Option<walkdir::DirEntry>> {
+    res: std::result::Result<DirEntry, walkdir::Error>,
+) -> Result<Option<DirEntry>> {
     match res {
         Ok(entry) => {
             // To check for broken symlink: call symlink_metadata() - it does 
not traverse symlinks);
@@ -1048,7 +1048,7 @@ fn convert_walkdir_result(
 
         Err(walkdir_err) => match walkdir_err.io_error() {
             Some(io_err) => match io_err.kind() {
-                io::ErrorKind::NotFound => Ok(None),
+                ErrorKind::NotFound => Ok(None),
                 _ => Err(Error::UnableToWalkDir {
                     source: walkdir_err,
                 }
@@ -1476,6 +1476,7 @@ mod not_wasm_tests {
     use crate::local::LocalFileSystem;
     use crate::{ObjectStore, Path};
     use bytes::Bytes;
+    use std::time::Duration;
     use tempfile::TempDir;
     use tokio::io::AsyncWriteExt;
 
@@ -1560,6 +1561,25 @@ mod not_wasm_tests {
         let expected_data = Bytes::from("arbitrarydatagnzarbitrarydatagnz");
         assert_eq!(&*read_data, expected_data);
     }
+
+    #[tokio::test]
+    async fn test_cleanup_intermediate_files() {
+        let root = TempDir::new().unwrap();
+        let integration = 
LocalFileSystem::new_with_prefix(root.path()).unwrap();
+
+        let location = Path::from("some_file");
+        let (_, mut writer) = 
integration.put_multipart(&location).await.unwrap();
+        writer.write_all(b"hello").await.unwrap();
+
+        let file_count = std::fs::read_dir(root.path()).unwrap().count();
+        assert_eq!(file_count, 1);
+        drop(writer);
+
+        tokio::time::sleep(Duration::from_millis(1)).await;
+
+        let file_count = std::fs::read_dir(root.path()).unwrap().count();
+        assert_eq!(file_count, 0);
+    }
 }
 
 #[cfg(target_family = "unix")]

Reply via email to