This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 93a6360d1 make sure CompleteWriter is enforcing the right invariants
(#7056)
93a6360d1 is described below
commit 93a6360d158f12c9085c4f065d6909abaed27cf8
Author: Carl Sverre <[email protected]>
AuthorDate: Sun Jan 18 23:12:00 2026 -0800
make sure CompleteWriter is enforcing the right invariants (#7056)
Make the CompletionWriter enforce more precise invariants
---
core/core/src/layers/complete.rs | 58 +++++++++++++++++++++++++++++++++++-----
1 file changed, 52 insertions(+), 6 deletions(-)
diff --git a/core/core/src/layers/complete.rs b/core/core/src/layers/complete.rs
index 75fa9ee09..d9cf1a403 100644
--- a/core/core/src/layers/complete.rs
+++ b/core/core/src/layers/complete.rs
@@ -153,10 +153,32 @@ impl<R: oio::Read> oio::Read for CompleteReader<R> {
}
}
+/// Tracks the state of the Write operation.
+/// A successful operation goes through states: Open -> Written -> Closed
+/// A failed operation terminates in the Error state
+#[derive(Debug, PartialEq, Eq)]
+enum CompleteState {
+ Open,
+ Written,
+ Closed,
+ Error,
+}
+
+impl CompleteState {
+ /// Attempt to transition to the destination state. Once CompleteState has
+ /// errored all further transitions are ignored.
+ fn transition(&mut self, destination: CompleteState) {
+ if *self != CompleteState::Error {
+ *self = destination
+ }
+ }
+}
+
pub struct CompleteWriter<W> {
inner: Option<W>,
append: bool,
size: u64,
+ state: CompleteState,
}
impl<W> CompleteWriter<W> {
@@ -165,6 +187,7 @@ impl<W> CompleteWriter<W> {
inner: Some(inner),
append,
size: 0,
+ state: CompleteState::Open,
}
}
@@ -194,8 +217,10 @@ impl<W> CompleteWriter<W> {
#[cfg(debug_assertions)]
impl<W> Drop for CompleteWriter<W> {
fn drop(&mut self) {
- if self.inner.is_some() {
- log::warn!("writer has not been closed or aborted, must be a bug")
+ if self.state == CompleteState::Written {
+ log::warn!(
+ "writer has not been closed or aborted after successful write
operation, must be a bug"
+ )
}
}
}
@@ -206,29 +231,47 @@ where
{
async fn write(&mut self, bs: Buffer) -> Result<()> {
let w = self.inner.as_mut().ok_or_else(|| {
+ debug_assert_ne!(
+ self.state,
+ CompleteState::Open,
+ "bug: inner is empty, but state is Open"
+ );
Error::new(ErrorKind::Unexpected, "writer has been closed or
aborted")
})?;
let len = bs.len();
- w.write(bs).await?;
+ w.write(bs)
+ .await
+ .inspect_err(|_| self.state.transition(CompleteState::Error))?;
self.size += len as u64;
+ self.state.transition(CompleteState::Written);
Ok(())
}
async fn close(&mut self) -> Result<Metadata> {
let w = self.inner.as_mut().ok_or_else(|| {
+ debug_assert_ne!(
+ self.state,
+ CompleteState::Open,
+ "bug: inner is empty, but state is Open"
+ );
Error::new(ErrorKind::Unexpected, "writer has been closed or
aborted")
})?;
// we must return `Err` before setting inner to None; otherwise,
// we won't be able to retry `close` in `RetryLayer`.
- let mut ret = w.close().await?;
- self.check(ret.content_length())?;
+ let mut ret = w
+ .close()
+ .await
+ .inspect_err(|_| self.state.transition(CompleteState::Error))?;
+ self.check(ret.content_length())
+ .inspect_err(|_| self.state.transition(CompleteState::Error))?;
if ret.content_length() == 0 {
ret = ret.with_content_length(self.size);
}
self.inner = None;
+ self.state.transition(CompleteState::Closed);
Ok(ret)
}
@@ -238,8 +281,11 @@ where
Error::new(ErrorKind::Unexpected, "writer has been closed or
aborted")
})?;
- w.abort().await?;
+ w.abort()
+ .await
+ .inspect_err(|_| self.state.transition(CompleteState::Error))?;
self.inner = None;
+ self.state.transition(CompleteState::Closed);
Ok(())
}