This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 38afc9bc4c feat: add concurrent writer behavior tests (#3920)
38afc9bc4c is described below
commit 38afc9bc4c5316698ecfdf23fada89b7c49d698c
Author: Weny Xu <[email protected]>
AuthorDate: Fri Jan 5 22:10:08 2024 +0900
feat: add concurrent writer behavior tests (#3920)
---
core/tests/behavior/async_write.rs | 189 ++++++++++++++++++++++++++++++++++++-
1 file changed, 188 insertions(+), 1 deletion(-)
diff --git a/core/tests/behavior/async_write.rs
b/core/tests/behavior/async_write.rs
index 7eb99df271..282d4076d2 100644
--- a/core/tests/behavior/async_write.rs
+++ b/core/tests/behavior/async_write.rs
@@ -42,10 +42,15 @@ pub fn tests(op: &Operator, tests: &mut Vec<Trial>) {
test_write_with_content_type,
test_write_with_content_disposition,
test_writer_write,
+ test_writer_write_with_concurrent,
test_writer_sink,
+ test_writer_sink_with_concurrent,
test_writer_copy,
+ test_writer_copy_with_concurrent,
test_writer_abort,
- test_writer_futures_copy
+ test_writer_abort_with_concurrent,
+ test_writer_futures_copy,
+ test_writer_futures_copy_with_concurrent
))
}
@@ -221,6 +226,33 @@ pub async fn test_writer_abort(op: Operator) -> Result<()>
{
Ok(())
}
+/// Delete existing file should succeed.
+pub async fn test_writer_abort_with_concurrent(op: Operator) -> Result<()> {
+ let (path, content, _) = TEST_FIXTURE.new_file(op.clone());
+
+ let mut writer = match op.writer_with(&path).concurrent(2).await {
+ Ok(writer) => writer,
+ Err(e) => {
+ assert_eq!(e.kind(), ErrorKind::Unsupported);
+ return Ok(());
+ }
+ };
+
+ if let Err(e) = writer.write(content).await {
+ assert_eq!(e.kind(), ErrorKind::Unsupported);
+ return Ok(());
+ }
+
+ if let Err(e) = writer.abort().await {
+ assert_eq!(e.kind(), ErrorKind::Unsupported);
+ return Ok(());
+ }
+
+ // Aborted writer should not write actual file.
+ assert!(!op.is_exist(&path).await?);
+ Ok(())
+}
+
/// Append data into writer
pub async fn test_writer_write(op: Operator) -> Result<()> {
if !(op.info().full_capability().write_can_multi) {
@@ -256,6 +288,41 @@ pub async fn test_writer_write(op: Operator) -> Result<()>
{
Ok(())
}
+/// Append data into writer
+pub async fn test_writer_write_with_concurrent(op: Operator) -> Result<()> {
+ if !(op.info().full_capability().write_can_multi) {
+ return Ok(());
+ }
+
+ let path = TEST_FIXTURE.new_file_path();
+ let size = 5 * 1024 * 1024; // write file with 5 MiB
+ let content_a = gen_fixed_bytes(size);
+ let content_b = gen_fixed_bytes(size);
+
+ let mut w = op.writer_with(&path).concurrent(2).await?;
+ w.write(content_a.clone()).await?;
+ w.write(content_b.clone()).await?;
+ w.close().await?;
+
+ let meta = op.stat(&path).await.expect("stat must succeed");
+ assert_eq!(meta.content_length(), (size * 2) as u64);
+
+ let bs = op.read(&path).await?;
+ assert_eq!(bs.len(), size * 2, "read size");
+ assert_eq!(
+ format!("{:x}", Sha256::digest(&bs[..size])),
+ format!("{:x}", Sha256::digest(content_a)),
+ "read content a"
+ );
+ assert_eq!(
+ format!("{:x}", Sha256::digest(&bs[size..])),
+ format!("{:x}", Sha256::digest(content_b)),
+ "read content b"
+ );
+
+ Ok(())
+}
+
/// Streaming data into writer
pub async fn test_writer_sink(op: Operator) -> Result<()> {
let cap = op.info().full_capability();
@@ -292,6 +359,46 @@ pub async fn test_writer_sink(op: Operator) -> Result<()> {
Ok(())
}
+/// Streaming data into writer
+pub async fn test_writer_sink_with_concurrent(op: Operator) -> Result<()> {
+ let cap = op.info().full_capability();
+ if !(cap.write && cap.write_can_multi) {
+ return Ok(());
+ }
+
+ let path = TEST_FIXTURE.new_file_path();
+ let size = 5 * 1024 * 1024; // write file with 5 MiB
+ let content_a = gen_fixed_bytes(size);
+ let content_b = gen_fixed_bytes(size);
+ let stream = stream::iter(vec![content_a.clone(),
content_b.clone()]).map(Ok);
+
+ let mut w = op
+ .writer_with(&path)
+ .buffer(5 * 1024 * 1024)
+ .concurrent(4)
+ .await?;
+ w.sink(stream).await?;
+ w.close().await?;
+
+ let meta = op.stat(&path).await.expect("stat must succeed");
+ assert_eq!(meta.content_length(), (size * 2) as u64);
+
+ let bs = op.read(&path).await?;
+ assert_eq!(bs.len(), size * 2, "read size");
+ assert_eq!(
+ format!("{:x}", Sha256::digest(&bs[..size])),
+ format!("{:x}", Sha256::digest(content_a)),
+ "read content a"
+ );
+ assert_eq!(
+ format!("{:x}", Sha256::digest(&bs[size..])),
+ format!("{:x}", Sha256::digest(content_b)),
+ "read content b"
+ );
+
+ Ok(())
+}
+
/// Reading data into writer
pub async fn test_writer_copy(op: Operator) -> Result<()> {
let cap = op.info().full_capability();
@@ -333,6 +440,51 @@ pub async fn test_writer_copy(op: Operator) -> Result<()> {
Ok(())
}
+/// Reading data into writer
+pub async fn test_writer_copy_with_concurrent(op: Operator) -> Result<()> {
+ let cap = op.info().full_capability();
+ if !(cap.write && cap.write_can_multi) {
+ return Ok(());
+ }
+
+ let path = TEST_FIXTURE.new_file_path();
+ let size = 5 * 1024 * 1024; // write file with 5 MiB
+ let content_a = gen_fixed_bytes(size);
+ let content_b = gen_fixed_bytes(size);
+
+ let mut w = op
+ .writer_with(&path)
+ .buffer(5 * 1024 * 1024)
+ .concurrent(4)
+ .await?;
+
+ let mut content = Bytes::from([content_a.clone(),
content_b.clone()].concat());
+ while !content.is_empty() {
+ let reader = Cursor::new(content.clone());
+ let n = w.copy(reader).await?;
+ content.advance(n as usize);
+ }
+ w.close().await?;
+
+ let meta = op.stat(&path).await.expect("stat must succeed");
+ assert_eq!(meta.content_length(), (size * 2) as u64);
+
+ let bs = op.read(&path).await?;
+ assert_eq!(bs.len(), size * 2, "read size");
+ assert_eq!(
+ format!("{:x}", Sha256::digest(&bs[..size])),
+ format!("{:x}", Sha256::digest(content_a)),
+ "read content a"
+ );
+ assert_eq!(
+ format!("{:x}", Sha256::digest(&bs[size..])),
+ format!("{:x}", Sha256::digest(content_b)),
+ "read content b"
+ );
+
+ Ok(())
+}
+
/// Copy data from reader to writer
pub async fn test_writer_futures_copy(op: Operator) -> Result<()> {
if !(op.info().full_capability().write_can_multi) {
@@ -364,6 +516,41 @@ pub async fn test_writer_futures_copy(op: Operator) ->
Result<()> {
Ok(())
}
+/// Copy data from reader to writer
+pub async fn test_writer_futures_copy_with_concurrent(op: Operator) ->
Result<()> {
+ if !(op.info().full_capability().write_can_multi) {
+ return Ok(());
+ }
+
+ let path = TEST_FIXTURE.new_file_path();
+ let (content, size): (Vec<u8>, usize) =
+ gen_bytes_with_range(10 * 1024 * 1024..20 * 1024 * 1024);
+
+ let mut w = op
+ .writer_with(&path)
+ .buffer(8 * 1024 * 1024)
+ .concurrent(4)
+ .await?;
+
+ // Wrap a buf reader here to make sure content is read in 1MiB chunks.
+ let mut cursor = BufReader::with_capacity(1024 * 1024,
Cursor::new(content.clone()));
+ futures::io::copy_buf(&mut cursor, &mut w).await?;
+ w.close().await?;
+
+ let meta = op.stat(&path).await.expect("stat must succeed");
+ assert_eq!(meta.content_length(), size as u64);
+
+ let bs = op.read(&path).await?;
+ assert_eq!(bs.len(), size, "read size");
+ assert_eq!(
+ format!("{:x}", Sha256::digest(&bs[..size])),
+ format!("{:x}", Sha256::digest(content)),
+ "read content"
+ );
+
+ Ok(())
+}
+
/// Test append to a file must success.
pub async fn test_write_with_append(op: Operator) -> Result<()> {
let path = TEST_FIXTURE.new_file_path();