This is an automated email from the ASF dual-hosted git repository.

hoslo pushed a commit to branch fix-close
in repository https://gitbox.apache.org/repos/asf/opendal.git

commit 55cb9f32e186523d247140686b889f1b2a2de8d2
Author: Xuanwo <[email protected]>
AuthorDate: Sun Feb 4 10:12:49 2024 +0800

    fix(services/fs,hdfs): fix poll_close when retry
---
 core/Cargo.toml                                    |  4 ++
 core/benches/vs_fs/Cargo.toml                      | 10 ++---
 core/benches/vs_s3/Cargo.toml                      | 10 ++---
 core/edge/file_write_on_full_disk/Cargo.toml       |  6 +--
 .../Cargo.toml                                     |  6 +--
 core/edge/s3_read_on_wasm/Cargo.toml               |  4 +-
 core/fuzz/Cargo.toml                               |  4 +-
 core/src/services/fs/writer.rs                     | 51 ++++++++++++++++------
 core/src/services/hdfs/writer.rs                   | 22 +++++++---
 9 files changed, 73 insertions(+), 44 deletions(-)

diff --git a/core/Cargo.toml b/core/Cargo.toml
index a8cc247c27..69d1010d16 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -34,6 +34,10 @@ version = "0.45.0"
 all-features = true
 rustdoc-args = ["--cfg", "docs"]
 
+[workspace]
+default-members = ["."]
+members = [".", "fuzz", "edge/*", "benches/vs_*"]
+
 [features]
 default = [
   "rustls",
diff --git a/core/benches/vs_fs/Cargo.toml b/core/benches/vs_fs/Cargo.toml
index ea4b77da52..291989dacf 100644
--- a/core/benches/vs_fs/Cargo.toml
+++ b/core/benches/vs_fs/Cargo.toml
@@ -17,17 +17,13 @@
 
 [package]
 description = "OpenDAL Benchmark vs fs"
+edition = "2021"
+license = "Apache-2.0"
 name = "opendal-benchmark-vs-fs"
 publish = false
+rust-version = "1.67"
 version = "0.0.0"
 
-authors.workspace = true
-edition.workspace = true
-homepage.workspace = true
-license.workspace = true
-repository.workspace = true
-rust-version.workspace = true
-
 [dependencies]
 criterion = { version = "0.5", features = ["async", "async_tokio"] }
 opendal = { path = "../..", features = ["tests"] }
diff --git a/core/benches/vs_s3/Cargo.toml b/core/benches/vs_s3/Cargo.toml
index 53f3bfcd09..4aa2ff45cb 100644
--- a/core/benches/vs_s3/Cargo.toml
+++ b/core/benches/vs_s3/Cargo.toml
@@ -17,17 +17,13 @@
 
 [package]
 description = "OpenDAL Benchmark vs s3"
+edition = "2021"
+license = "Apache-2.0"
 name = "opendal-benchmark-vs-s3"
 publish = false
+rust-version = "1.67"
 version = "0.0.0"
 
-authors.workspace = true
-edition.workspace = true
-homepage.workspace = true
-license.workspace = true
-repository.workspace = true
-rust-version.workspace = true
-
 [dependencies]
 aws-config = { version = "1.0.1", features = ["behavior-version-latest"] }
 aws-credential-types = { version = "1.0.1", features = [
diff --git a/core/edge/file_write_on_full_disk/Cargo.toml 
b/core/edge/file_write_on_full_disk/Cargo.toml
index 8a12f6583e..63a57a21a7 100644
--- a/core/edge/file_write_on_full_disk/Cargo.toml
+++ b/core/edge/file_write_on_full_disk/Cargo.toml
@@ -17,14 +17,14 @@
 
 [package]
 edition = "2021"
+license = "Apache-2.0"
 name = "edge_test_file_write_on_full_disk"
 publish = false
+rust-version = "1.67"
 version = "0.0.0"
 
-license.workspace = true
-
 [dependencies]
 futures = "0.3"
-opendal = { workspace = true }
+opendal = { path = "../../" }
 rand = "0.8"
 tokio = { version = "1", features = ["full"] }
diff --git a/core/edge/s3_aws_assume_role_with_web_identity/Cargo.toml 
b/core/edge/s3_aws_assume_role_with_web_identity/Cargo.toml
index c4842534ea..b6c1fd1c88 100644
--- a/core/edge/s3_aws_assume_role_with_web_identity/Cargo.toml
+++ b/core/edge/s3_aws_assume_role_with_web_identity/Cargo.toml
@@ -17,13 +17,13 @@
 
 [package]
 edition = "2021"
+license = "Apache-2.0"
 name = "edge_test_aws_s3_assume_role_with_web_identity"
 publish = false
+rust-version = "1.67"
 version = "0.0.0"
 
-license.workspace = true
-
 [dependencies]
-opendal = { workspace = true, features = ["tests"] }
+opendal = { path = "../../", features = ["tests"] }
 tokio = { version = "1", features = ["full"] }
 uuid = { version = "1", features = ["serde", "v4"] }
diff --git a/core/edge/s3_read_on_wasm/Cargo.toml 
b/core/edge/s3_read_on_wasm/Cargo.toml
index e479a0eee6..7698253fd0 100644
--- a/core/edge/s3_read_on_wasm/Cargo.toml
+++ b/core/edge/s3_read_on_wasm/Cargo.toml
@@ -17,12 +17,12 @@
 
 [package]
 edition = "2021"
+license = "Apache-2.0"
 name = "edge_test_s3_read_on_wasm"
 publish = false
+rust-version = "1.67"
 version = "0.0.0"
 
-license.workspace = true
-
 [lib]
 crate-type = ["cdylib"]
 
diff --git a/core/fuzz/Cargo.toml b/core/fuzz/Cargo.toml
index 2e8a9fb050..75713ab922 100644
--- a/core/fuzz/Cargo.toml
+++ b/core/fuzz/Cargo.toml
@@ -17,12 +17,12 @@
 
 [package]
 edition = "2021"
+license = "Apache-2.0"
 name = "opendal-fuzz"
 publish = false
+rust-version = "1.67"
 version = "0.0.0"
 
-license.workspace = true
-
 [package.metadata]
 cargo-fuzz = true
 
diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs
index 12e5a4fffe..a8ebb4fd3c 100644
--- a/core/src/services/fs/writer.rs
+++ b/core/src/services/fs/writer.rs
@@ -35,7 +35,7 @@ pub struct FsWriter<F> {
     tmp_path: Option<PathBuf>,
 
     f: Option<F>,
-    fut: Option<BoxFuture<'static, Result<()>>>,
+    fut: Option<BoxFuture<'static, (F, Result<()>)>>,
 }
 
 impl<F> FsWriter<F> {
@@ -69,23 +69,35 @@ impl oio::Write for FsWriter<tokio::fs::File> {
             if let Some(fut) = self.fut.as_mut() {
                 let res = ready!(fut.poll_unpin(cx));
                 self.fut = None;
-                return Poll::Ready(res);
+                if let Err(e) = res.1 {
+                    self.f = Some(res.0);
+                    return Poll::Ready(Err(e));
+                }
+                return Poll::Ready(Ok(()));
             }
 
             let mut f = self.f.take().expect("FsWriter must be initialized");
             let tmp_path = self.tmp_path.clone();
             let target_path = self.target_path.clone();
             self.fut = Some(Box::pin(async move {
-                f.flush().await.map_err(new_std_io_error)?;
-                f.sync_all().await.map_err(new_std_io_error)?;
+                if let Err(e) = f.flush().await.map_err(new_std_io_error) {
+                    // Reserve the original error for retry.
+                    return (f, Err(e));
+                }
+                if let Err(e) = f.sync_all().await.map_err(new_std_io_error) {
+                    return (f, Err(e));
+                }
 
                 if let Some(tmp_path) = &tmp_path {
-                    tokio::fs::rename(tmp_path, &target_path)
+                    if let Err(e) = tokio::fs::rename(tmp_path, &target_path)
                         .await
-                        .map_err(new_std_io_error)?;
+                        .map_err(new_std_io_error)
+                    {
+                        return (f, Err(e));
+                    }
                 }
 
-                Ok(())
+                (f, Ok(()))
             }));
         }
     }
@@ -95,21 +107,32 @@ impl oio::Write for FsWriter<tokio::fs::File> {
             if let Some(fut) = self.fut.as_mut() {
                 let res = ready!(fut.poll_unpin(cx));
                 self.fut = None;
-                return Poll::Ready(res);
+                if let Err(e) = res.1 {
+                    self.f = Some(res.0);
+                    return Poll::Ready(Err(e));
+                }
+                return Poll::Ready(Ok(()));
             }
 
-            let _ = self.f.take().expect("FsWriter must be initialized");
+            let f = self.f.take().expect("FsWriter must be initialized");
             let tmp_path = self.tmp_path.clone();
             self.fut = Some(Box::pin(async move {
                 if let Some(tmp_path) = &tmp_path {
-                    tokio::fs::remove_file(tmp_path)
+                    if let Err(e) = tokio::fs::remove_file(tmp_path)
                         .await
                         .map_err(new_std_io_error)
+                    {
+                        return (f, Err(e));
+                    }
+                    (f, Ok(()))
                 } else {
-                    Err(Error::new(
-                        ErrorKind::Unsupported,
-                        "Fs doesn't support abort if atomic_write_dir is not 
set",
-                    ))
+                    (
+                        f,
+                        Err(Error::new(
+                            ErrorKind::Unsupported,
+                            "Fs doesn't support abort if atomic_write_dir is 
not set",
+                        )),
+                    )
                 }
             }));
         }
diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs
index 6c77097d84..535e97c38b 100644
--- a/core/src/services/hdfs/writer.rs
+++ b/core/src/services/hdfs/writer.rs
@@ -36,7 +36,7 @@ pub struct HdfsWriter<F> {
     tmp_path: Option<String>,
     f: Option<F>,
     client: Arc<hdrs::Client>,
-    fut: Option<BoxFuture<'static, Result<()>>>,
+    fut: Option<BoxFuture<'static, (F, Result<()>)>>,
 }
 
 /// # Safety
@@ -76,7 +76,11 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
             if let Some(fut) = self.fut.as_mut() {
                 let res = ready!(fut.poll_unpin(cx));
                 self.fut = None;
-                return Poll::Ready(res);
+                if let Err(e) = res.1 {
+                    self.f = Some(res.0);
+                    return Poll::Ready(Err(e));
+                }
+                return Poll::Ready(Ok(()));
             }
 
             let mut f = self.f.take().expect("HdfsWriter must be initialized");
@@ -86,15 +90,21 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
             let client = self.client.clone();
 
             self.fut = Some(Box::pin(async move {
-                f.close().await.map_err(new_std_io_error)?;
+                if let Err(e) = f.close().await.map_err(new_std_io_error) {
+                    // Reserve the original error for retry.
+                    return (f, Err(e));
+                }
 
                 if let Some(tmp_path) = tmp_path {
-                    client
+                    if let Err(e) = client
                         .rename_file(&tmp_path, &target_path)
-                        .map_err(new_std_io_error)?;
+                        .map_err(new_std_io_error)
+                    {
+                        return (f, Err(e));
+                    }
                 }
 
-                Ok(())
+                (f, Ok(()))
             }));
         }
     }

Reply via email to