This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 2fed1a18 feat(tests): Add fuzz test for writer without content length 
(#2100)
2fed1a18 is described below

commit 2fed1a187de141a23912a0d62e94b3ccf02b5cee
Author: Xuanwo <[email protected]>
AuthorDate: Mon Apr 24 18:50:46 2023 +0800

    feat(tests): Add fuzz test for writer without content length (#2100)
    
    * feat(tests): Add fuzz test for writer
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix writer could lost data
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * change to log warning
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/layers/complete.rs      | 51 ++++++++++++++++++---
 core/src/layers/logging.rs       | 37 +++++++++-------
 core/src/services/fs/backend.rs  |  1 +
 core/src/services/gcs/backend.rs |  1 +
 core/src/services/oss/backend.rs |  1 +
 core/src/services/s3/backend.rs  |  2 +
 core/src/types/writer.rs         | 19 +++++++-
 core/tests/behavior/utils.rs     | 96 ++++++++++++++++++++++++++++++++++++++++
 core/tests/behavior/write.rs     | 28 ++++++++++++
 9 files changed, 212 insertions(+), 24 deletions(-)

diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index 26791208..f5b25b66 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -509,7 +509,7 @@ where
 }
 
 pub struct CompleteWriter<W> {
-    inner: W,
+    inner: Option<W>,
     size: Option<u64>,
     written: u64,
 }
@@ -517,13 +517,25 @@ pub struct CompleteWriter<W> {
 impl<W> CompleteWriter<W> {
     pub fn new(inner: W, size: Option<u64>) -> CompleteWriter<W> {
         CompleteWriter {
-            inner,
+            inner: Some(inner),
             size,
             written: 0,
         }
     }
 }
 
+/// Check if the writer has been closed or aborted while debug_assertions
+/// enabled. This code will never be executed in release mode.
+#[cfg(debug_assertions)]
+impl<W> Drop for CompleteWriter<W> {
+    fn drop(&mut self) {
+        if self.inner.is_some() {
+            // Do we need to panic here?
+            log::warn!("writer has not been closed or aborted, must be a bug")
+        }
+    }
+}
+
 #[async_trait]
 impl<W> oio::Write for CompleteWriter<W>
 where
@@ -544,13 +556,23 @@ where
             }
         }
 
-        self.inner.write(bs).await?;
+        let w = self.inner.as_mut().ok_or_else(|| {
+            Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
+        })?;
+        w.write(bs).await?;
         self.written += n as u64;
         Ok(())
     }
 
     async fn abort(&mut self) -> Result<()> {
-        self.inner.abort().await
+        let w = self.inner.as_mut().ok_or_else(|| {
+            Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
+        })?;
+
+        w.abort().await?;
+        self.inner = None;
+
+        Ok(())
     }
 
     async fn close(&mut self) -> Result<()> {
@@ -566,7 +588,13 @@ where
             }
         }
 
-        self.inner.close().await?;
+        let w = self.inner.as_mut().ok_or_else(|| {
+            Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
+        })?;
+
+        w.close().await?;
+        self.inner = None;
+
         Ok(())
     }
 }
@@ -590,7 +618,11 @@ where
             }
         }
 
-        self.inner.write(bs)?;
+        let w = self.inner.as_mut().ok_or_else(|| {
+            Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
+        })?;
+
+        w.write(bs)?;
         self.written += n as u64;
         Ok(())
     }
@@ -608,7 +640,12 @@ where
             }
         }
 
-        self.inner.close()?;
+        let w = self.inner.as_mut().ok_or_else(|| {
+            Error::new(ErrorKind::Unexpected, "writer has been closed or 
aborted")
+        })?;
+
+        w.close()?;
+        self.inner = None;
         Ok(())
     }
 }
diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs
index a684ac5d..e5a83f14 100644
--- a/core/src/layers/logging.rs
+++ b/core/src/layers/logging.rs
@@ -1340,19 +1340,6 @@ impl<W> LoggingWriter<W> {
     }
 }
 
-impl<W> Drop for LoggingWriter<W> {
-    fn drop(&mut self) {
-        debug!(
-            target: LOGGING_TARGET,
-            "service={} operation={} path={} written={} -> data written 
finished",
-            self.scheme,
-            self.op,
-            self.path,
-            self.written
-        );
-    }
-}
-
 #[async_trait]
 impl<W: oio::Write> oio::Write for LoggingWriter<W> {
     async fn write(&mut self, bs: Bytes) -> Result<()> {
@@ -1420,7 +1407,17 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
 
     async fn close(&mut self) -> Result<()> {
         match self.inner.close().await {
-            Ok(_) => Ok(()),
+            Ok(_) => {
+                debug!(
+                    target: LOGGING_TARGET,
+                    "service={} operation={} path={} written={} -> data 
written finished",
+                    self.scheme,
+                    self.op,
+                    self.path,
+                    self.written
+                );
+                Ok(())
+            }
             Err(err) => {
                 if let Some(lvl) = self.failure_level {
                     log!(
@@ -1475,7 +1472,17 @@ impl<W: oio::BlockingWrite> oio::BlockingWrite for 
LoggingWriter<W> {
 
     fn close(&mut self) -> Result<()> {
         match self.inner.close() {
-            Ok(_) => Ok(()),
+            Ok(_) => {
+                debug!(
+                    target: LOGGING_TARGET,
+                    "service={} operation={} path={} written={} -> data 
written finished",
+                    self.scheme,
+                    self.op,
+                    self.path,
+                    self.written
+                );
+                Ok(())
+            }
             Err(err) => {
                 if let Some(lvl) = self.failure_level {
                     log!(
diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs
index 4e78321c..5509663b 100644
--- a/core/src/services/fs/backend.rs
+++ b/core/src/services/fs/backend.rs
@@ -304,6 +304,7 @@ impl Accessor for FsBackend {
                 read: true,
                 read_can_seek: true,
                 write: true,
+                write_without_content_length: true,
                 create_dir: true,
                 list: true,
                 copy: true,
diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index c2388dae..778c7781 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -377,6 +377,7 @@ impl Accessor for GcsBackend {
                 read: true,
                 read_can_next: true,
                 write: true,
+                write_without_content_length: true,
                 list: true,
                 scan: true,
                 copy: true,
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index 276fc32e..8748c7dd 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -367,6 +367,7 @@ impl Accessor for OssBackend {
                 read: true,
                 read_can_next: true,
                 write: true,
+                write_without_content_length: true,
                 list: true,
                 scan: true,
                 copy: true,
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index eed0335e..0adf125a 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -924,6 +924,8 @@ impl Accessor for S3Backend {
                 read_with_override_content_disposition: true,
 
                 write: true,
+                write_without_content_length: true,
+
                 list: true,
                 scan: true,
                 copy: true,
diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs
index 87d88de8..e9e4243a 100644
--- a/core/src/types/writer.rs
+++ b/core/src/types/writer.rs
@@ -37,6 +37,11 @@ use crate::*;
 ///
 /// ## Notes
 ///
+/// Please make sure either `close` or `abort` has been called before
+/// dropping the writer otherwise the data could be lost.
+///
+/// ## Notes
+///
 /// Writer can be used in two ways:
 ///
 /// - Sized: write data with a known size by specify the content length.
@@ -81,7 +86,12 @@ impl Writer {
         }
     }
 
-    /// Abort inner writer.
+    /// Abort the writer and clean up all written data.
+    ///
+    /// ## Notes
+    ///
+    /// Abort should only be called when the writer is not closed or
+    /// aborted, otherwise an unexpected error could be returned.
     pub async fn abort(&mut self) -> Result<()> {
         if let State::Idle(Some(w)) = &mut self.state {
             w.abort().await
@@ -93,7 +103,12 @@ impl Writer {
         }
     }
 
-    /// Close the writer and make sure all data have been stored.
+    /// Close the writer and make sure all data have been committed.
+    ///
+    /// ## Notes
+    ///
+    /// Close should only be called when the writer is not closed or
+    /// aborted, otherwise an unexpected error could be returned.
     pub async fn close(&mut self) -> Result<()> {
         if let State::Idle(Some(w)) = &mut self.state {
             w.close().await
diff --git a/core/tests/behavior/utils.rs b/core/tests/behavior/utils.rs
index 5592ecfe..fdf877d6 100644
--- a/core/tests/behavior/utils.rs
+++ b/core/tests/behavior/utils.rs
@@ -17,6 +17,9 @@
 
 use std::collections::HashMap;
 use std::env;
+use std::fmt;
+use std::fmt::Debug;
+use std::fmt::Formatter;
 use std::io::SeekFrom;
 use std::usize;
 
@@ -279,3 +282,96 @@ impl ObjectReaderFuzzer {
         }
     }
 }
+
+/// ObjectWriterFuzzer is the fuzzer for object writer.
+///
+/// We will generate random write operations to operate on object
+/// write to check if the output is expected.
+///
+/// # TODO
+///
+/// This fuzzer only generate valid operations.
+///
+/// In the future, we need to generate invalid operations to check if we
+/// handled correctly.
+pub struct ObjectWriterFuzzer {
+    name: String,
+    bs: Vec<u8>,
+
+    size: Option<usize>,
+    cur: usize,
+    rng: ThreadRng,
+    actions: Vec<ObjectWriterAction>,
+}
+
+#[derive(Clone)]
+pub enum ObjectWriterAction {
+    Write(Bytes),
+}
+
+impl Debug for ObjectWriterAction {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+        match self {
+            ObjectWriterAction::Write(bs) => write!(f, "Write({})", bs.len()),
+        }
+    }
+}
+
+impl ObjectWriterFuzzer {
+    /// Create a new fuzzer.
+    pub fn new(name: &str, size: Option<usize>) -> Self {
+        Self {
+            name: name.to_string(),
+            bs: Vec::new(),
+
+            size,
+            cur: 0,
+
+            rng: thread_rng(),
+            actions: vec![],
+        }
+    }
+
+    /// Generate a new action.
+    pub fn fuzz(&mut self) -> ObjectWriterAction {
+        let max = if let Some(size) = self.size {
+            size - self.cur
+        } else {
+            // Set max to 1MiB
+            1024 * 1024
+        };
+
+        let size = self.rng.gen_range(0..max);
+
+        let mut bs = vec![0; size];
+        self.rng.fill_bytes(&mut bs);
+
+        let bs = Bytes::from(bs);
+        self.bs.extend_from_slice(&bs);
+        self.cur += bs.len();
+
+        let action = ObjectWriterAction::Write(bs);
+        debug!("{} perform fuzz action: {:?}", self.name, action);
+
+        self.actions.push(action.clone());
+
+        action
+    }
+
+    /// Check if read operation is expected.
+    pub fn check(&mut self, actual_bs: &[u8]) {
+        assert_eq!(
+            self.bs.len(),
+            actual_bs.len(),
+            "check failed: expected len is different with actual len, actions: 
{:?}",
+            self.actions
+        );
+
+        assert_eq!(
+            format!("{:x}", Sha256::digest(&self.bs)),
+            format!("{:x}", Sha256::digest(actual_bs)),
+            "check failed: expected bs is different with actual bs, actions: 
{:?}",
+            self.actions,
+        );
+    }
+}
diff --git a/core/tests/behavior/write.rs b/core/tests/behavior/write.rs
index 4b76128d..39a6d98d 100644
--- a/core/tests/behavior/write.rs
+++ b/core/tests/behavior/write.rs
@@ -104,6 +104,7 @@ macro_rules! behavior_write_tests {
                 test_writer_write,
                 test_writer_abort,
                 test_writer_futures_copy,
+                test_fuzz_unsized_writer,
             );
         )*
     };
@@ -885,3 +886,30 @@ pub async fn test_writer_futures_copy(op: Operator) -> 
Result<()> {
     op.delete(&path).await.expect("delete must succeed");
     Ok(())
 }
+
+/// Add test for unsized writer
+pub async fn test_fuzz_unsized_writer(op: Operator) -> Result<()> {
+    if !op.info().capability().write_without_content_length {
+        warn!("{op:?} doesn't support write without content length, test 
skip");
+        return Ok(());
+    }
+
+    let path = uuid::Uuid::new_v4().to_string();
+
+    let mut fuzzer = ObjectWriterFuzzer::new(&path, None);
+
+    let mut w = op.writer(&path).await?;
+
+    for _ in 0..100 {
+        match fuzzer.fuzz() {
+            ObjectWriterAction::Write(bs) => w.write(bs).await?,
+        }
+    }
+    w.close().await?;
+
+    let content = op.read(&path).await?;
+    fuzzer.check(&content);
+
+    op.delete(&path).await.expect("delete must succeed");
+    Ok(())
+}

Reply via email to