This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 38afc9bc4c feat: add concurrent writer behavior tests (#3920)
38afc9bc4c is described below

commit 38afc9bc4c5316698ecfdf23fada89b7c49d698c
Author: Weny Xu <[email protected]>
AuthorDate: Fri Jan 5 22:10:08 2024 +0900

    feat: add concurrent writer behavior tests (#3920)
---
 core/tests/behavior/async_write.rs | 189 ++++++++++++++++++++++++++++++++++++-
 1 file changed, 188 insertions(+), 1 deletion(-)

diff --git a/core/tests/behavior/async_write.rs 
b/core/tests/behavior/async_write.rs
index 7eb99df271..282d4076d2 100644
--- a/core/tests/behavior/async_write.rs
+++ b/core/tests/behavior/async_write.rs
@@ -42,10 +42,15 @@ pub fn tests(op: &Operator, tests: &mut Vec<Trial>) {
             test_write_with_content_type,
             test_write_with_content_disposition,
             test_writer_write,
+            test_writer_write_with_concurrent,
             test_writer_sink,
+            test_writer_sink_with_concurrent,
             test_writer_copy,
+            test_writer_copy_with_concurrent,
             test_writer_abort,
-            test_writer_futures_copy
+            test_writer_abort_with_concurrent,
+            test_writer_futures_copy,
+            test_writer_futures_copy_with_concurrent
         ))
     }
 
@@ -221,6 +226,33 @@ pub async fn test_writer_abort(op: Operator) -> Result<()> 
{
     Ok(())
 }
 
+/// Delete existing file should succeed.
+pub async fn test_writer_abort_with_concurrent(op: Operator) -> Result<()> {
+    let (path, content, _) = TEST_FIXTURE.new_file(op.clone());
+
+    let mut writer = match op.writer_with(&path).concurrent(2).await {
+        Ok(writer) => writer,
+        Err(e) => {
+            assert_eq!(e.kind(), ErrorKind::Unsupported);
+            return Ok(());
+        }
+    };
+
+    if let Err(e) = writer.write(content).await {
+        assert_eq!(e.kind(), ErrorKind::Unsupported);
+        return Ok(());
+    }
+
+    if let Err(e) = writer.abort().await {
+        assert_eq!(e.kind(), ErrorKind::Unsupported);
+        return Ok(());
+    }
+
+    // Aborted writer should not write actual file.
+    assert!(!op.is_exist(&path).await?);
+    Ok(())
+}
+
 /// Append data into writer
 pub async fn test_writer_write(op: Operator) -> Result<()> {
     if !(op.info().full_capability().write_can_multi) {
@@ -256,6 +288,41 @@ pub async fn test_writer_write(op: Operator) -> Result<()> 
{
     Ok(())
 }
 
+/// Append data into writer
+pub async fn test_writer_write_with_concurrent(op: Operator) -> Result<()> {
+    if !(op.info().full_capability().write_can_multi) {
+        return Ok(());
+    }
+
+    let path = TEST_FIXTURE.new_file_path();
+    let size = 5 * 1024 * 1024; // write file with 5 MiB
+    let content_a = gen_fixed_bytes(size);
+    let content_b = gen_fixed_bytes(size);
+
+    let mut w = op.writer_with(&path).concurrent(2).await?;
+    w.write(content_a.clone()).await?;
+    w.write(content_b.clone()).await?;
+    w.close().await?;
+
+    let meta = op.stat(&path).await.expect("stat must succeed");
+    assert_eq!(meta.content_length(), (size * 2) as u64);
+
+    let bs = op.read(&path).await?;
+    assert_eq!(bs.len(), size * 2, "read size");
+    assert_eq!(
+        format!("{:x}", Sha256::digest(&bs[..size])),
+        format!("{:x}", Sha256::digest(content_a)),
+        "read content a"
+    );
+    assert_eq!(
+        format!("{:x}", Sha256::digest(&bs[size..])),
+        format!("{:x}", Sha256::digest(content_b)),
+        "read content b"
+    );
+
+    Ok(())
+}
+
 /// Streaming data into writer
 pub async fn test_writer_sink(op: Operator) -> Result<()> {
     let cap = op.info().full_capability();
@@ -292,6 +359,46 @@ pub async fn test_writer_sink(op: Operator) -> Result<()> {
     Ok(())
 }
 
+/// Streaming data into writer
+pub async fn test_writer_sink_with_concurrent(op: Operator) -> Result<()> {
+    let cap = op.info().full_capability();
+    if !(cap.write && cap.write_can_multi) {
+        return Ok(());
+    }
+
+    let path = TEST_FIXTURE.new_file_path();
+    let size = 5 * 1024 * 1024; // write file with 5 MiB
+    let content_a = gen_fixed_bytes(size);
+    let content_b = gen_fixed_bytes(size);
+    let stream = stream::iter(vec![content_a.clone(), 
content_b.clone()]).map(Ok);
+
+    let mut w = op
+        .writer_with(&path)
+        .buffer(5 * 1024 * 1024)
+        .concurrent(4)
+        .await?;
+    w.sink(stream).await?;
+    w.close().await?;
+
+    let meta = op.stat(&path).await.expect("stat must succeed");
+    assert_eq!(meta.content_length(), (size * 2) as u64);
+
+    let bs = op.read(&path).await?;
+    assert_eq!(bs.len(), size * 2, "read size");
+    assert_eq!(
+        format!("{:x}", Sha256::digest(&bs[..size])),
+        format!("{:x}", Sha256::digest(content_a)),
+        "read content a"
+    );
+    assert_eq!(
+        format!("{:x}", Sha256::digest(&bs[size..])),
+        format!("{:x}", Sha256::digest(content_b)),
+        "read content b"
+    );
+
+    Ok(())
+}
+
 /// Reading data into writer
 pub async fn test_writer_copy(op: Operator) -> Result<()> {
     let cap = op.info().full_capability();
@@ -333,6 +440,51 @@ pub async fn test_writer_copy(op: Operator) -> Result<()> {
     Ok(())
 }
 
+/// Reading data into writer
+pub async fn test_writer_copy_with_concurrent(op: Operator) -> Result<()> {
+    let cap = op.info().full_capability();
+    if !(cap.write && cap.write_can_multi) {
+        return Ok(());
+    }
+
+    let path = TEST_FIXTURE.new_file_path();
+    let size = 5 * 1024 * 1024; // write file with 5 MiB
+    let content_a = gen_fixed_bytes(size);
+    let content_b = gen_fixed_bytes(size);
+
+    let mut w = op
+        .writer_with(&path)
+        .buffer(5 * 1024 * 1024)
+        .concurrent(4)
+        .await?;
+
+    let mut content = Bytes::from([content_a.clone(), 
content_b.clone()].concat());
+    while !content.is_empty() {
+        let reader = Cursor::new(content.clone());
+        let n = w.copy(reader).await?;
+        content.advance(n as usize);
+    }
+    w.close().await?;
+
+    let meta = op.stat(&path).await.expect("stat must succeed");
+    assert_eq!(meta.content_length(), (size * 2) as u64);
+
+    let bs = op.read(&path).await?;
+    assert_eq!(bs.len(), size * 2, "read size");
+    assert_eq!(
+        format!("{:x}", Sha256::digest(&bs[..size])),
+        format!("{:x}", Sha256::digest(content_a)),
+        "read content a"
+    );
+    assert_eq!(
+        format!("{:x}", Sha256::digest(&bs[size..])),
+        format!("{:x}", Sha256::digest(content_b)),
+        "read content b"
+    );
+
+    Ok(())
+}
+
 /// Copy data from reader to writer
 pub async fn test_writer_futures_copy(op: Operator) -> Result<()> {
     if !(op.info().full_capability().write_can_multi) {
@@ -364,6 +516,41 @@ pub async fn test_writer_futures_copy(op: Operator) -> 
Result<()> {
     Ok(())
 }
 
+/// Copy data from reader to writer
+pub async fn test_writer_futures_copy_with_concurrent(op: Operator) -> 
Result<()> {
+    if !(op.info().full_capability().write_can_multi) {
+        return Ok(());
+    }
+
+    let path = TEST_FIXTURE.new_file_path();
+    let (content, size): (Vec<u8>, usize) =
+        gen_bytes_with_range(10 * 1024 * 1024..20 * 1024 * 1024);
+
+    let mut w = op
+        .writer_with(&path)
+        .buffer(8 * 1024 * 1024)
+        .concurrent(4)
+        .await?;
+
+    // Wrap a buf reader here to make sure content is read in 1MiB chunks.
+    let mut cursor = BufReader::with_capacity(1024 * 1024, 
Cursor::new(content.clone()));
+    futures::io::copy_buf(&mut cursor, &mut w).await?;
+    w.close().await?;
+
+    let meta = op.stat(&path).await.expect("stat must succeed");
+    assert_eq!(meta.content_length(), size as u64);
+
+    let bs = op.read(&path).await?;
+    assert_eq!(bs.len(), size, "read size");
+    assert_eq!(
+        format!("{:x}", Sha256::digest(&bs[..size])),
+        format!("{:x}", Sha256::digest(content)),
+        "read content"
+    );
+
+    Ok(())
+}
+
 /// Test append to a file must success.
 pub async fn test_write_with_append(op: Operator) -> Result<()> {
     let path = TEST_FIXTURE.new_file_path();

Reply via email to