This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 1e46f8f08a Best effort cleanup of staged upload files (#4778) (#4792)
1e46f8f08a is described below
commit 1e46f8f08a9aa735fe581602aff66f5cc0c40b05
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Fri Sep 8 08:43:38 2023 +0100
Best effort cleanup of staged upload files (#4778) (#4792)
* Best effort cleanup of staged upload files (#4778)
* Clippy
* Fix MSRV
---
object_store/src/local.rs | 142 ++++++++++++++++++++++++++--------------------
1 file changed, 81 insertions(+), 61 deletions(-)
diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index 495bb4f9c4..20eb3c63cc 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -26,6 +26,7 @@ use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::future::BoxFuture;
+use futures::ready;
use futures::{stream::BoxStream, StreamExt};
use futures::{FutureExt, TryStreamExt};
use snafu::{ensure, ResultExt, Snafu};
@@ -274,13 +275,15 @@ impl ObjectStore for LocalFileSystem {
maybe_spawn_blocking(move || {
let (mut file, suffix) = new_staged_upload(&path)?;
let staging_path = staged_upload_path(&path, &suffix);
-
file.write_all(&bytes)
- .context(UnableToCopyDataToFileSnafu)?;
-
- std::fs::rename(staging_path,
path).context(UnableToRenameFileSnafu)?;
-
- Ok(())
+ .context(UnableToCopyDataToFileSnafu)
+ .and_then(|_| {
+ std::fs::rename(&staging_path,
&path).context(UnableToRenameFileSnafu)
+ })
+ .map_err(|e| {
+ let _ = std::fs::remove_file(&staging_path); // Attempt to
cleanup
+ e.into()
+ })
})
.await
}
@@ -304,12 +307,14 @@ impl ObjectStore for LocalFileSystem {
multipart_id: &MultipartId,
) -> Result<()> {
let dest = self.config.path_to_filesystem(location)?;
- let staging_path: PathBuf = staged_upload_path(&dest, multipart_id);
+ let path: PathBuf = staged_upload_path(&dest, multipart_id);
- maybe_spawn_blocking(move || {
- std::fs::remove_file(&staging_path)
- .context(UnableToDeleteFileSnafu { path: staging_path })?;
- Ok(())
+ maybe_spawn_blocking(move || match std::fs::remove_file(&path) {
+ Ok(_) => Ok(()),
+ Err(source) => match source.kind() {
+ ErrorKind::NotFound => Ok(()), // Already deleted
+ _ => Err(Error::UnableToDeleteFile { path, source }.into()),
+ },
})
.await
}
@@ -318,7 +323,6 @@ impl ObjectStore for LocalFileSystem {
&self,
location: &Path,
) -> Result<Box<dyn AsyncWrite + Unpin + Send>> {
- #[cfg(not(target_arch = "wasm32"))]
// Get the path to the file from the configuration.
let path = self.config.path_to_filesystem(location)?;
loop {
@@ -358,8 +362,6 @@ impl ObjectStore for LocalFileSystem {
}
}
}
- #[cfg(target_arch = "wasm32")]
- Err(super::Error::NotImplemented)
}
async fn get_opts(&self, location: &Path, options: GetOptions) ->
Result<GetResult> {
@@ -597,8 +599,9 @@ impl ObjectStore for LocalFileSystem {
match std::fs::hard_link(&from, &staged) {
Ok(_) => {
return std::fs::rename(&staged, &to).map_err(|source| {
+ let _ = std::fs::remove_file(&staged); // Attempt to
clean up
Error::UnableToCopyFile { from, to, source }.into()
- })
+ });
}
Err(source) => match source.kind() {
ErrorKind::AlreadyExists => id += 1,
@@ -690,12 +693,9 @@ fn staged_upload_path(dest: &std::path::Path, suffix:
&str) -> PathBuf {
enum LocalUploadState {
/// Upload is ready to send new data
- Idle(Arc<std::fs::File>),
+ Idle(Arc<File>),
/// In the middle of a write
- Writing(
- Arc<std::fs::File>,
- BoxFuture<'static, Result<usize, io::Error>>,
- ),
+ Writing(Arc<File>, BoxFuture<'static, Result<usize, io::Error>>),
/// In the middle of syncing data and closing file.
///
/// Future will contain last reference to file, so it will call drop on
completion.
@@ -713,11 +713,7 @@ struct LocalUpload {
}
impl LocalUpload {
- pub fn new(
- dest: PathBuf,
- multipart_id: MultipartId,
- file: Arc<std::fs::File>,
- ) -> Self {
+ pub fn new(dest: PathBuf, multipart_id: MultipartId, file: Arc<File>) ->
Self {
Self {
inner_state: LocalUploadState::Idle(file),
dest,
@@ -731,14 +727,13 @@ impl AsyncWrite for LocalUpload {
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
- ) -> std::task::Poll<Result<usize, io::Error>> {
- let invalid_state =
- |condition: &str| -> std::task::Poll<Result<usize, io::Error>> {
- Poll::Ready(Err(io::Error::new(
- io::ErrorKind::InvalidInput,
- format!("Tried to write to file {condition}."),
- )))
- };
+ ) -> Poll<Result<usize, io::Error>> {
+ let invalid_state = |condition: &str| -> Poll<Result<usize,
io::Error>> {
+ Poll::Ready(Err(io::Error::new(
+ ErrorKind::InvalidInput,
+ format!("Tried to write to file {condition}."),
+ )))
+ };
if let Ok(runtime) = tokio::runtime::Handle::try_current() {
let mut data: Vec<u8> = buf.to_vec();
@@ -757,7 +752,7 @@ impl AsyncWrite for LocalUpload {
.spawn_blocking(move ||
(&*file2).write_all(&data))
.map(move |res| match res {
Err(err) => {
-
Err(io::Error::new(io::ErrorKind::Other, err))
+
Err(io::Error::new(ErrorKind::Other, err))
}
Ok(res) => res.map(move |_| data_len),
}),
@@ -765,16 +760,9 @@ impl AsyncWrite for LocalUpload {
);
}
LocalUploadState::Writing(file, inner_write) => {
- match inner_write.poll_unpin(cx) {
- Poll::Ready(res) => {
- self.inner_state =
- LocalUploadState::Idle(Arc::clone(file));
- return Poll::Ready(res);
- }
- Poll::Pending => {
- return Poll::Pending;
- }
- }
+ let res = ready!(inner_write.poll_unpin(cx));
+ self.inner_state =
LocalUploadState::Idle(Arc::clone(file));
+ return Poll::Ready(res);
}
LocalUploadState::ShuttingDown(_) => {
return invalid_state("when writer is shutting down");
@@ -800,14 +788,14 @@ impl AsyncWrite for LocalUpload {
fn poll_flush(
self: Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<Result<(), io::Error>> {
+ ) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<Result<(), io::Error>> {
+ ) -> Poll<Result<(), io::Error>> {
if let Ok(runtime) = tokio::runtime::Handle::try_current() {
loop {
match &mut self.inner_state {
@@ -854,13 +842,11 @@ impl AsyncWrite for LocalUpload {
"Tried to commit a file where a write is in
progress.",
)));
}
- LocalUploadState::Committing(fut) => match
fut.poll_unpin(cx) {
- Poll::Ready(res) => {
- self.inner_state = LocalUploadState::Complete;
- return Poll::Ready(res);
- }
- Poll::Pending => return Poll::Pending,
- },
+ LocalUploadState::Committing(fut) => {
+ let res = ready!(fut.poll_unpin(cx));
+ self.inner_state = LocalUploadState::Complete;
+ return Poll::Ready(res);
+ }
LocalUploadState::Complete => {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::Other,
@@ -876,22 +862,36 @@ impl AsyncWrite for LocalUpload {
let file = Arc::clone(file);
self.inner_state = LocalUploadState::Complete;
file.sync_all()?;
- std::mem::drop(file);
+ drop(file);
std::fs::rename(staging_path, &self.dest)?;
Poll::Ready(Ok(()))
}
_ => {
// If we are running on this thread, then only possible
states are Idle and Complete.
- Poll::Ready(Err(io::Error::new(
- io::ErrorKind::Other,
- "Already complete",
- )))
+ Poll::Ready(Err(io::Error::new(ErrorKind::Other, "Already
complete")))
}
}
}
}
}
+impl Drop for LocalUpload {
+ fn drop(&mut self) {
+ match self.inner_state {
+ LocalUploadState::Complete => (),
+ _ => {
+ self.inner_state = LocalUploadState::Complete;
+ let path = staged_upload_path(&self.dest, &self.multipart_id);
+ // Try to cleanup intermediate file ignoring any error
+ match tokio::runtime::Handle::try_current() {
+ Ok(r) => drop(r.spawn_blocking(move ||
std::fs::remove_file(path))),
+ Err(_) => drop(std::fs::remove_file(path)),
+ };
+ }
+ }
+ }
+}
+
pub(crate) fn chunked_stream(
mut file: File,
path: PathBuf,
@@ -1018,8 +1018,8 @@ fn convert_metadata(metadata: Metadata, location: Path)
-> Result<ObjectMeta> {
/// Convert walkdir results and converts not-found errors into `None`.
/// Convert broken symlinks to `None`.
fn convert_walkdir_result(
- res: std::result::Result<walkdir::DirEntry, walkdir::Error>,
-) -> Result<Option<walkdir::DirEntry>> {
+ res: std::result::Result<DirEntry, walkdir::Error>,
+) -> Result<Option<DirEntry>> {
match res {
Ok(entry) => {
// To check for broken symlink: call symlink_metadata() - it does
not traverse symlinks);
@@ -1048,7 +1048,7 @@ fn convert_walkdir_result(
Err(walkdir_err) => match walkdir_err.io_error() {
Some(io_err) => match io_err.kind() {
- io::ErrorKind::NotFound => Ok(None),
+ ErrorKind::NotFound => Ok(None),
_ => Err(Error::UnableToWalkDir {
source: walkdir_err,
}
@@ -1476,6 +1476,7 @@ mod not_wasm_tests {
use crate::local::LocalFileSystem;
use crate::{ObjectStore, Path};
use bytes::Bytes;
+ use std::time::Duration;
use tempfile::TempDir;
use tokio::io::AsyncWriteExt;
@@ -1560,6 +1561,25 @@ mod not_wasm_tests {
let expected_data = Bytes::from("arbitrarydatagnzarbitrarydatagnz");
assert_eq!(&*read_data, expected_data);
}
+
+ #[tokio::test]
+ async fn test_cleanup_intermediate_files() {
+ let root = TempDir::new().unwrap();
+ let integration =
LocalFileSystem::new_with_prefix(root.path()).unwrap();
+
+ let location = Path::from("some_file");
+ let (_, mut writer) =
integration.put_multipart(&location).await.unwrap();
+ writer.write_all(b"hello").await.unwrap();
+
+ let file_count = std::fs::read_dir(root.path()).unwrap().count();
+ assert_eq!(file_count, 1);
+ drop(writer);
+
+ tokio::time::sleep(Duration::from_millis(1)).await;
+
+ let file_count = std::fs::read_dir(root.path()).unwrap().count();
+ assert_eq!(file_count, 0);
+ }
}
#[cfg(target_family = "unix")]