This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 2fed1a18 feat(tests): Add fuzz test for writer without content length
(#2100)
2fed1a18 is described below
commit 2fed1a187de141a23912a0d62e94b3ccf02b5cee
Author: Xuanwo <[email protected]>
AuthorDate: Mon Apr 24 18:50:46 2023 +0800
feat(tests): Add fuzz test for writer without content length (#2100)
* feat(tests): Add fuzz test for writer
Signed-off-by: Xuanwo <[email protected]>
* Fix writer could lost data
Signed-off-by: Xuanwo <[email protected]>
* change to log warning
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
core/src/layers/complete.rs | 51 ++++++++++++++++++---
core/src/layers/logging.rs | 37 +++++++++-------
core/src/services/fs/backend.rs | 1 +
core/src/services/gcs/backend.rs | 1 +
core/src/services/oss/backend.rs | 1 +
core/src/services/s3/backend.rs | 2 +
core/src/types/writer.rs | 19 +++++++-
core/tests/behavior/utils.rs | 96 ++++++++++++++++++++++++++++++++++++++++
core/tests/behavior/write.rs | 28 ++++++++++++
9 files changed, 212 insertions(+), 24 deletions(-)
diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index 26791208..f5b25b66 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -509,7 +509,7 @@ where
}
pub struct CompleteWriter<W> {
- inner: W,
+ inner: Option<W>,
size: Option<u64>,
written: u64,
}
@@ -517,13 +517,25 @@ pub struct CompleteWriter<W> {
impl<W> CompleteWriter<W> {
pub fn new(inner: W, size: Option<u64>) -> CompleteWriter<W> {
CompleteWriter {
- inner,
+ inner: Some(inner),
size,
written: 0,
}
}
}
+/// Check if the writer has been closed or aborted while debug_assertions
+/// enabled. This code will never be executed in release mode.
+#[cfg(debug_assertions)]
+impl<W> Drop for CompleteWriter<W> {
+ fn drop(&mut self) {
+ if self.inner.is_some() {
+ // Do we need to panic here?
+ log::warn!("writer has not been closed or aborted, must be a bug")
+ }
+ }
+}
+
#[async_trait]
impl<W> oio::Write for CompleteWriter<W>
where
@@ -544,13 +556,23 @@ where
}
}
- self.inner.write(bs).await?;
+ let w = self.inner.as_mut().ok_or_else(|| {
+ Error::new(ErrorKind::Unexpected, "writer has been closed or
aborted")
+ })?;
+ w.write(bs).await?;
self.written += n as u64;
Ok(())
}
async fn abort(&mut self) -> Result<()> {
- self.inner.abort().await
+ let w = self.inner.as_mut().ok_or_else(|| {
+ Error::new(ErrorKind::Unexpected, "writer has been closed or
aborted")
+ })?;
+
+ w.abort().await?;
+ self.inner = None;
+
+ Ok(())
}
async fn close(&mut self) -> Result<()> {
@@ -566,7 +588,13 @@ where
}
}
- self.inner.close().await?;
+ let w = self.inner.as_mut().ok_or_else(|| {
+ Error::new(ErrorKind::Unexpected, "writer has been closed or
aborted")
+ })?;
+
+ w.close().await?;
+ self.inner = None;
+
Ok(())
}
}
@@ -590,7 +618,11 @@ where
}
}
- self.inner.write(bs)?;
+ let w = self.inner.as_mut().ok_or_else(|| {
+ Error::new(ErrorKind::Unexpected, "writer has been closed or
aborted")
+ })?;
+
+ w.write(bs)?;
self.written += n as u64;
Ok(())
}
@@ -608,7 +640,12 @@ where
}
}
- self.inner.close()?;
+ let w = self.inner.as_mut().ok_or_else(|| {
+ Error::new(ErrorKind::Unexpected, "writer has been closed or
aborted")
+ })?;
+
+ w.close()?;
+ self.inner = None;
Ok(())
}
}
diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs
index a684ac5d..e5a83f14 100644
--- a/core/src/layers/logging.rs
+++ b/core/src/layers/logging.rs
@@ -1340,19 +1340,6 @@ impl<W> LoggingWriter<W> {
}
}
-impl<W> Drop for LoggingWriter<W> {
- fn drop(&mut self) {
- debug!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} written={} -> data written
finished",
- self.scheme,
- self.op,
- self.path,
- self.written
- );
- }
-}
-
#[async_trait]
impl<W: oio::Write> oio::Write for LoggingWriter<W> {
async fn write(&mut self, bs: Bytes) -> Result<()> {
@@ -1420,7 +1407,17 @@ impl<W: oio::Write> oio::Write for LoggingWriter<W> {
async fn close(&mut self) -> Result<()> {
match self.inner.close().await {
- Ok(_) => Ok(()),
+ Ok(_) => {
+ debug!(
+ target: LOGGING_TARGET,
+ "service={} operation={} path={} written={} -> data
written finished",
+ self.scheme,
+ self.op,
+ self.path,
+ self.written
+ );
+ Ok(())
+ }
Err(err) => {
if let Some(lvl) = self.failure_level {
log!(
@@ -1475,7 +1472,17 @@ impl<W: oio::BlockingWrite> oio::BlockingWrite for
LoggingWriter<W> {
fn close(&mut self) -> Result<()> {
match self.inner.close() {
- Ok(_) => Ok(()),
+ Ok(_) => {
+ debug!(
+ target: LOGGING_TARGET,
+ "service={} operation={} path={} written={} -> data
written finished",
+ self.scheme,
+ self.op,
+ self.path,
+ self.written
+ );
+ Ok(())
+ }
Err(err) => {
if let Some(lvl) = self.failure_level {
log!(
diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs
index 4e78321c..5509663b 100644
--- a/core/src/services/fs/backend.rs
+++ b/core/src/services/fs/backend.rs
@@ -304,6 +304,7 @@ impl Accessor for FsBackend {
read: true,
read_can_seek: true,
write: true,
+ write_without_content_length: true,
create_dir: true,
list: true,
copy: true,
diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index c2388dae..778c7781 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -377,6 +377,7 @@ impl Accessor for GcsBackend {
read: true,
read_can_next: true,
write: true,
+ write_without_content_length: true,
list: true,
scan: true,
copy: true,
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index 276fc32e..8748c7dd 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -367,6 +367,7 @@ impl Accessor for OssBackend {
read: true,
read_can_next: true,
write: true,
+ write_without_content_length: true,
list: true,
scan: true,
copy: true,
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index eed0335e..0adf125a 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -924,6 +924,8 @@ impl Accessor for S3Backend {
read_with_override_content_disposition: true,
write: true,
+ write_without_content_length: true,
+
list: true,
scan: true,
copy: true,
diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs
index 87d88de8..e9e4243a 100644
--- a/core/src/types/writer.rs
+++ b/core/src/types/writer.rs
@@ -37,6 +37,11 @@ use crate::*;
///
/// ## Notes
///
+/// Please make sure either `close` or `abort` has been called before
+/// dropping the writer otherwise the data could be lost.
+///
+/// ## Notes
+///
/// Writer can be used in two ways:
///
/// - Sized: write data with a known size by specify the content length.
@@ -81,7 +86,12 @@ impl Writer {
}
}
- /// Abort inner writer.
+ /// Abort the writer and clean up all written data.
+ ///
+ /// ## Notes
+ ///
+ /// Abort should only be called when the writer is not closed or
+ /// aborted, otherwise an unexpected error could be returned.
pub async fn abort(&mut self) -> Result<()> {
if let State::Idle(Some(w)) = &mut self.state {
w.abort().await
@@ -93,7 +103,12 @@ impl Writer {
}
}
- /// Close the writer and make sure all data have been stored.
+ /// Close the writer and make sure all data have been committed.
+ ///
+ /// ## Notes
+ ///
+ /// Close should only be called when the writer is not closed or
+ /// aborted, otherwise an unexpected error could be returned.
pub async fn close(&mut self) -> Result<()> {
if let State::Idle(Some(w)) = &mut self.state {
w.close().await
diff --git a/core/tests/behavior/utils.rs b/core/tests/behavior/utils.rs
index 5592ecfe..fdf877d6 100644
--- a/core/tests/behavior/utils.rs
+++ b/core/tests/behavior/utils.rs
@@ -17,6 +17,9 @@
use std::collections::HashMap;
use std::env;
+use std::fmt;
+use std::fmt::Debug;
+use std::fmt::Formatter;
use std::io::SeekFrom;
use std::usize;
@@ -279,3 +282,96 @@ impl ObjectReaderFuzzer {
}
}
}
+
+/// ObjectWriterFuzzer is the fuzzer for object writer.
+///
+/// We will generate random write operations to operate on object
+/// write to check if the output is expected.
+///
+/// # TODO
+///
+/// This fuzzer only generate valid operations.
+///
+/// In the future, we need to generate invalid operations to check if we
+/// handled correctly.
+pub struct ObjectWriterFuzzer {
+ name: String,
+ bs: Vec<u8>,
+
+ size: Option<usize>,
+ cur: usize,
+ rng: ThreadRng,
+ actions: Vec<ObjectWriterAction>,
+}
+
+#[derive(Clone)]
+pub enum ObjectWriterAction {
+ Write(Bytes),
+}
+
+impl Debug for ObjectWriterAction {
+ fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+ match self {
+ ObjectWriterAction::Write(bs) => write!(f, "Write({})", bs.len()),
+ }
+ }
+}
+
+impl ObjectWriterFuzzer {
+ /// Create a new fuzzer.
+ pub fn new(name: &str, size: Option<usize>) -> Self {
+ Self {
+ name: name.to_string(),
+ bs: Vec::new(),
+
+ size,
+ cur: 0,
+
+ rng: thread_rng(),
+ actions: vec![],
+ }
+ }
+
+ /// Generate a new action.
+ pub fn fuzz(&mut self) -> ObjectWriterAction {
+ let max = if let Some(size) = self.size {
+ size - self.cur
+ } else {
+ // Set max to 1MiB
+ 1024 * 1024
+ };
+
+ let size = self.rng.gen_range(0..max);
+
+ let mut bs = vec![0; size];
+ self.rng.fill_bytes(&mut bs);
+
+ let bs = Bytes::from(bs);
+ self.bs.extend_from_slice(&bs);
+ self.cur += bs.len();
+
+ let action = ObjectWriterAction::Write(bs);
+ debug!("{} perform fuzz action: {:?}", self.name, action);
+
+ self.actions.push(action.clone());
+
+ action
+ }
+
+ /// Check if read operation is expected.
+ pub fn check(&mut self, actual_bs: &[u8]) {
+ assert_eq!(
+ self.bs.len(),
+ actual_bs.len(),
+ "check failed: expected len is different with actual len, actions:
{:?}",
+ self.actions
+ );
+
+ assert_eq!(
+ format!("{:x}", Sha256::digest(&self.bs)),
+ format!("{:x}", Sha256::digest(actual_bs)),
+ "check failed: expected bs is different with actual bs, actions:
{:?}",
+ self.actions,
+ );
+ }
+}
diff --git a/core/tests/behavior/write.rs b/core/tests/behavior/write.rs
index 4b76128d..39a6d98d 100644
--- a/core/tests/behavior/write.rs
+++ b/core/tests/behavior/write.rs
@@ -104,6 +104,7 @@ macro_rules! behavior_write_tests {
test_writer_write,
test_writer_abort,
test_writer_futures_copy,
+ test_fuzz_unsized_writer,
);
)*
};
@@ -885,3 +886,30 @@ pub async fn test_writer_futures_copy(op: Operator) ->
Result<()> {
op.delete(&path).await.expect("delete must succeed");
Ok(())
}
+
+/// Add test for unsized writer
+pub async fn test_fuzz_unsized_writer(op: Operator) -> Result<()> {
+ if !op.info().capability().write_without_content_length {
+ warn!("{op:?} doesn't support write without content length, test
skip");
+ return Ok(());
+ }
+
+ let path = uuid::Uuid::new_v4().to_string();
+
+ let mut fuzzer = ObjectWriterFuzzer::new(&path, None);
+
+ let mut w = op.writer(&path).await?;
+
+ for _ in 0..100 {
+ match fuzzer.fuzz() {
+ ObjectWriterAction::Write(bs) => w.write(bs).await?,
+ }
+ }
+ w.close().await?;
+
+ let content = op.read(&path).await?;
+ fuzzer.check(&content);
+
+ op.delete(&path).await.expect("delete must succeed");
+ Ok(())
+}