This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 93a6360d1 make sure CompleteWriter is enforcing the right invariants 
(#7056)
93a6360d1 is described below

commit 93a6360d158f12c9085c4f065d6909abaed27cf8
Author: Carl Sverre <[email protected]>
AuthorDate: Sun Jan 18 23:12:00 2026 -0800

    make sure CompleteWriter is enforcing the right invariants (#7056)
    
    Make the CompletionWriter enforce more precise invariants
---
 core/core/src/layers/complete.rs | 58 +++++++++++++++++++++++++++++++++++-----
 1 file changed, 52 insertions(+), 6 deletions(-)

diff --git a/core/core/src/layers/complete.rs b/core/core/src/layers/complete.rs
index 75fa9ee09..d9cf1a403 100644
--- a/core/core/src/layers/complete.rs
+++ b/core/core/src/layers/complete.rs
@@ -153,10 +153,32 @@ impl<R: oio::Read> oio::Read for CompleteReader<R> {
     }
 }
 
+/// Tracks the state of the Write operation.
+/// A successful operation goes through states: Open -> Written -> Closed
+/// A failed operation terminates in the Error state
+#[derive(Debug, PartialEq, Eq)]
+enum CompleteState {
+    Open,
+    Written,
+    Closed,
+    Error,
+}
+
+impl CompleteState {
+    /// Attempt to transition to the destination state. Once CompleteState has
+    /// errored all further transitions are ignored.
+    fn transition(&mut self, destination: CompleteState) {
+        if *self != CompleteState::Error {
+            *self = destination
+        }
+    }
+}
+
 pub struct CompleteWriter<W> {
     inner: Option<W>,
     append: bool,
     size: u64,
+    state: CompleteState,
 }
 
 impl<W> CompleteWriter<W> {
@@ -165,6 +187,7 @@ impl<W> CompleteWriter<W> {
             inner: Some(inner),
             append,
             size: 0,
+            state: CompleteState::Open,
         }
     }
 
@@ -194,8 +217,10 @@ impl<W> CompleteWriter<W> {
 #[cfg(debug_assertions)]
 impl<W> Drop for CompleteWriter<W> {
     fn drop(&mut self) {
-        if self.inner.is_some() {
-            log::warn!("writer has not been closed or aborted, must be a bug")
+        if self.state == CompleteState::Written {
+            log::warn!(
+                "writer has not been closed or aborted after successful write 
operation, must be a bug"
+            )
         }
     }
 }
@@ -206,29 +231,47 @@ where
 {
     async fn write(&mut self, bs: Buffer) -> Result<()> {
         let w = self.inner.as_mut().ok_or_else(|| {
+            debug_assert_ne!(
+                self.state,
+                CompleteState::Open,
+                "bug: inner is empty, but state is Open"
+            );
             Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
         })?;
 
         let len = bs.len();
-        w.write(bs).await?;
+        w.write(bs)
+            .await
+            .inspect_err(|_| self.state.transition(CompleteState::Error))?;
         self.size += len as u64;
+        self.state.transition(CompleteState::Written);
 
         Ok(())
     }
 
     async fn close(&mut self) -> Result<Metadata> {
         let w = self.inner.as_mut().ok_or_else(|| {
+            debug_assert_ne!(
+                self.state,
+                CompleteState::Open,
+                "bug: inner is empty, but state is Open"
+            );
             Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
         })?;
 
         // we must return `Err` before setting inner to None; otherwise,
         // we won't be able to retry `close` in `RetryLayer`.
-        let mut ret = w.close().await?;
-        self.check(ret.content_length())?;
+        let mut ret = w
+            .close()
+            .await
+            .inspect_err(|_| self.state.transition(CompleteState::Error))?;
+        self.check(ret.content_length())
+            .inspect_err(|_| self.state.transition(CompleteState::Error))?;
         if ret.content_length() == 0 {
             ret = ret.with_content_length(self.size);
         }
         self.inner = None;
+        self.state.transition(CompleteState::Closed);
 
         Ok(ret)
     }
@@ -238,8 +281,11 @@ where
             Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
         })?;
 
-        w.abort().await?;
+        w.abort()
+            .await
+            .inspect_err(|_| self.state.transition(CompleteState::Error))?;
         self.inner = None;
+        self.state.transition(CompleteState::Closed);
 
         Ok(())
     }

Reply via email to