This is an automated email from the ASF dual-hosted git repository. hoslo pushed a commit to branch fix-close in repository https://gitbox.apache.org/repos/asf/opendal.git
commit 55cb9f32e186523d247140686b889f1b2a2de8d2 Author: Xuanwo <[email protected]> AuthorDate: Sun Feb 4 10:12:49 2024 +0800 fix(services/fs,hdfs): fix poll_close when retry --- core/Cargo.toml | 4 ++ core/benches/vs_fs/Cargo.toml | 10 ++--- core/benches/vs_s3/Cargo.toml | 10 ++--- core/edge/file_write_on_full_disk/Cargo.toml | 6 +-- .../Cargo.toml | 6 +-- core/edge/s3_read_on_wasm/Cargo.toml | 4 +- core/fuzz/Cargo.toml | 4 +- core/src/services/fs/writer.rs | 51 ++++++++++++++++------ core/src/services/hdfs/writer.rs | 22 +++++++--- 9 files changed, 73 insertions(+), 44 deletions(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index a8cc247c27..69d1010d16 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -34,6 +34,10 @@ version = "0.45.0" all-features = true rustdoc-args = ["--cfg", "docs"] +[workspace] +default-members = ["."] +members = [".", "fuzz", "edge/*", "benches/vs_*"] + [features] default = [ "rustls", diff --git a/core/benches/vs_fs/Cargo.toml b/core/benches/vs_fs/Cargo.toml index ea4b77da52..291989dacf 100644 --- a/core/benches/vs_fs/Cargo.toml +++ b/core/benches/vs_fs/Cargo.toml @@ -17,17 +17,13 @@ [package] description = "OpenDAL Benchmark vs fs" +edition = "2021" +license = "Apache-2.0" name = "opendal-benchmark-vs-fs" publish = false +rust-version = "1.67" version = "0.0.0" -authors.workspace = true -edition.workspace = true -homepage.workspace = true -license.workspace = true -repository.workspace = true -rust-version.workspace = true - [dependencies] criterion = { version = "0.5", features = ["async", "async_tokio"] } opendal = { path = "../..", features = ["tests"] } diff --git a/core/benches/vs_s3/Cargo.toml b/core/benches/vs_s3/Cargo.toml index 53f3bfcd09..4aa2ff45cb 100644 --- a/core/benches/vs_s3/Cargo.toml +++ b/core/benches/vs_s3/Cargo.toml @@ -17,17 +17,13 @@ [package] description = "OpenDAL Benchmark vs s3" +edition = "2021" +license = "Apache-2.0" name = "opendal-benchmark-vs-s3" publish = false +rust-version = "1.67" version = "0.0.0" -authors.workspace = true -edition.workspace = true -homepage.workspace = true -license.workspace = true -repository.workspace = true -rust-version.workspace = true - [dependencies] aws-config = { version = "1.0.1", features = ["behavior-version-latest"] } aws-credential-types = { version = "1.0.1", features = [ diff --git a/core/edge/file_write_on_full_disk/Cargo.toml b/core/edge/file_write_on_full_disk/Cargo.toml index 8a12f6583e..63a57a21a7 100644 --- a/core/edge/file_write_on_full_disk/Cargo.toml +++ b/core/edge/file_write_on_full_disk/Cargo.toml @@ -17,14 +17,14 @@ [package] edition = "2021" +license = "Apache-2.0" name = "edge_test_file_write_on_full_disk" publish = false +rust-version = "1.67" version = "0.0.0" -license.workspace = true - [dependencies] futures = "0.3" -opendal = { workspace = true } +opendal = { path = "../../" } rand = "0.8" tokio = { version = "1", features = ["full"] } diff --git a/core/edge/s3_aws_assume_role_with_web_identity/Cargo.toml b/core/edge/s3_aws_assume_role_with_web_identity/Cargo.toml index c4842534ea..b6c1fd1c88 100644 --- a/core/edge/s3_aws_assume_role_with_web_identity/Cargo.toml +++ b/core/edge/s3_aws_assume_role_with_web_identity/Cargo.toml @@ -17,13 +17,13 @@ [package] edition = "2021" +license = "Apache-2.0" name = "edge_test_aws_s3_assume_role_with_web_identity" publish = false +rust-version = "1.67" version = "0.0.0" -license.workspace = true - [dependencies] -opendal = { workspace = true, features = ["tests"] } +opendal = { path = "../../", features = ["tests"] } tokio = { version = "1", features = ["full"] } uuid = { version = "1", features = ["serde", "v4"] } diff --git a/core/edge/s3_read_on_wasm/Cargo.toml b/core/edge/s3_read_on_wasm/Cargo.toml index e479a0eee6..7698253fd0 100644 --- a/core/edge/s3_read_on_wasm/Cargo.toml +++ b/core/edge/s3_read_on_wasm/Cargo.toml @@ -17,12 +17,12 @@ [package] edition = "2021" +license = "Apache-2.0" name = "edge_test_s3_read_on_wasm" publish = false +rust-version = "1.67" version = "0.0.0" -license.workspace = true - [lib] crate-type = ["cdylib"] diff --git a/core/fuzz/Cargo.toml b/core/fuzz/Cargo.toml index 2e8a9fb050..75713ab922 100644 --- a/core/fuzz/Cargo.toml +++ b/core/fuzz/Cargo.toml @@ -17,12 +17,12 @@ [package] edition = "2021" +license = "Apache-2.0" name = "opendal-fuzz" publish = false +rust-version = "1.67" version = "0.0.0" -license.workspace = true - [package.metadata] cargo-fuzz = true diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs index 12e5a4fffe..a8ebb4fd3c 100644 --- a/core/src/services/fs/writer.rs +++ b/core/src/services/fs/writer.rs @@ -35,7 +35,7 @@ pub struct FsWriter<F> { tmp_path: Option<PathBuf>, f: Option<F>, - fut: Option<BoxFuture<'static, Result<()>>>, + fut: Option<BoxFuture<'static, (F, Result<()>)>>, } impl<F> FsWriter<F> { @@ -69,23 +69,35 @@ impl oio::Write for FsWriter<tokio::fs::File> { if let Some(fut) = self.fut.as_mut() { let res = ready!(fut.poll_unpin(cx)); self.fut = None; - return Poll::Ready(res); + if let Err(e) = res.1 { + self.f = Some(res.0); + return Poll::Ready(Err(e)); + } + return Poll::Ready(Ok(())); } let mut f = self.f.take().expect("FsWriter must be initialized"); let tmp_path = self.tmp_path.clone(); let target_path = self.target_path.clone(); self.fut = Some(Box::pin(async move { - f.flush().await.map_err(new_std_io_error)?; - f.sync_all().await.map_err(new_std_io_error)?; + if let Err(e) = f.flush().await.map_err(new_std_io_error) { + // Reserve the original error for retry. + return (f, Err(e)); + } + if let Err(e) = f.sync_all().await.map_err(new_std_io_error) { + return (f, Err(e)); + } if let Some(tmp_path) = &tmp_path { - tokio::fs::rename(tmp_path, &target_path) + if let Err(e) = tokio::fs::rename(tmp_path, &target_path) .await - .map_err(new_std_io_error)?; + .map_err(new_std_io_error) + { + return (f, Err(e)); + } } - Ok(()) + (f, Ok(())) })); } } @@ -95,21 +107,32 @@ impl oio::Write for FsWriter<tokio::fs::File> { if let Some(fut) = self.fut.as_mut() { let res = ready!(fut.poll_unpin(cx)); self.fut = None; - return Poll::Ready(res); + if let Err(e) = res.1 { + self.f = Some(res.0); + return Poll::Ready(Err(e)); + } + return Poll::Ready(Ok(())); } - let _ = self.f.take().expect("FsWriter must be initialized"); + let f = self.f.take().expect("FsWriter must be initialized"); let tmp_path = self.tmp_path.clone(); self.fut = Some(Box::pin(async move { if let Some(tmp_path) = &tmp_path { - tokio::fs::remove_file(tmp_path) + if let Err(e) = tokio::fs::remove_file(tmp_path) .await .map_err(new_std_io_error) + { + return (f, Err(e)); + } + (f, Ok(())) } else { - Err(Error::new( - ErrorKind::Unsupported, - "Fs doesn't support abort if atomic_write_dir is not set", - )) + ( + f, + Err(Error::new( + ErrorKind::Unsupported, + "Fs doesn't support abort if atomic_write_dir is not set", + )), + ) } })); } diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs index 6c77097d84..535e97c38b 100644 --- a/core/src/services/hdfs/writer.rs +++ b/core/src/services/hdfs/writer.rs @@ -36,7 +36,7 @@ pub struct HdfsWriter<F> { tmp_path: Option<String>, f: Option<F>, client: Arc<hdrs::Client>, - fut: Option<BoxFuture<'static, Result<()>>>, + fut: Option<BoxFuture<'static, (F, Result<()>)>>, } /// # Safety @@ -76,7 +76,11 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> { if let Some(fut) = self.fut.as_mut() { let res = ready!(fut.poll_unpin(cx)); self.fut = None; - return Poll::Ready(res); + if let Err(e) = res.1 { + self.f = Some(res.0); + return Poll::Ready(Err(e)); + } + return Poll::Ready(Ok(())); } let mut f = self.f.take().expect("HdfsWriter must be initialized"); @@ -86,15 +90,21 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> { let client = self.client.clone(); self.fut = Some(Box::pin(async move { - f.close().await.map_err(new_std_io_error)?; + if let Err(e) = f.close().await.map_err(new_std_io_error) { + // Reserve the original error for retry. + return (f, Err(e)); + } if let Some(tmp_path) = tmp_path { - client + if let Err(e) = client .rename_file(&tmp_path, &target_path) - .map_err(new_std_io_error)?; + .map_err(new_std_io_error) + { + return (f, Err(e)); + } } - Ok(()) + (f, Ok(())) })); } }
