This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 4b02228f7 test(blocking): tests for blocking append (#3023)
4b02228f7 is described below
commit 4b02228f7fe9a7c0f21a1660fee95716910c7a0a
Author: Suyan <[email protected]>
AuthorDate: Mon Sep 18 13:16:11 2023 +0800
test(blocking): tests for blocking append (#3023)
* test for blocking append & fix fs
Signed-off-by: suyanhanx <[email protected]>
* move path check to path build block
Signed-off-by: suyanhanx <[email protected]>
* path check when append enabled only
Signed-off-by: suyanhanx <[email protected]>
---------
Signed-off-by: suyanhanx <[email protected]>
---
core/src/services/fs/backend.rs | 46 +++++--
core/src/services/fs/writer.rs | 1 +
core/tests/behavior/blocking_append.rs | 220 +++++++++++++++++++++++++++++++++
core/tests/behavior/main.rs | 3 +
4 files changed, 262 insertions(+), 8 deletions(-)
diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs
index b1f1fdf49..a225958d2 100644
--- a/core/src/services/fs/backend.rs
+++ b/core/src/services/fs/backend.rs
@@ -55,6 +55,11 @@ impl FsBuilder {
}
/// Set temp dir for atomic write.
+ ///
+ /// # Notes
+ ///
+ /// - When append is enabled, we will not use atomic write
+ /// to avoid data loss and performance issue.
pub fn atomic_write_dir(&mut self, dir: &str) -> &mut Self {
self.atomic_write_dir = if dir.is_empty() {
None
@@ -367,7 +372,17 @@ impl Accessor for FsBackend {
let target_path = Self::ensure_write_abs_path(&self.root,
path).await?;
let tmp_path =
Self::ensure_write_abs_path(atomic_write_dir,
&tmp_file_of(path)).await?;
- (target_path, Some(tmp_path))
+
+ // If the target file exists, we should append to the end of it
directly.
+ if op.append()
+ && tokio::fs::try_exists(&target_path)
+ .await
+ .map_err(parse_io_error)?
+ {
+ (target_path, None)
+ } else {
+ (target_path, Some(tmp_path))
+ }
} else {
let p = Self::ensure_write_abs_path(&self.root, path).await?;
@@ -375,7 +390,6 @@ impl Accessor for FsBackend {
};
let mut open_options = tokio::fs::OpenOptions::new();
-
open_options.create(true).write(true);
if op.append() {
open_options.append(true);
@@ -554,22 +568,38 @@ impl Accessor for FsBackend {
Ok((RpRead::new(end - start), r))
}
- fn blocking_write(&self, path: &str, _: OpWrite) -> Result<(RpWrite,
Self::BlockingWriter)> {
+ fn blocking_write(&self, path: &str, op: OpWrite) -> Result<(RpWrite,
Self::BlockingWriter)> {
let (target_path, tmp_path) = if let Some(atomic_write_dir) =
&self.atomic_write_dir {
let target_path = Self::blocking_ensure_write_abs_path(&self.root,
path)?;
let tmp_path =
Self::blocking_ensure_write_abs_path(atomic_write_dir,
&tmp_file_of(path))?;
- (target_path, Some(tmp_path))
+
+ // If the target file exists, we should append to the end of it
directly.
+ if op.append()
+ && Path::new(&target_path)
+ .try_exists()
+ .map_err(parse_io_error)?
+ {
+ (target_path, None)
+ } else {
+ (target_path, Some(tmp_path))
+ }
} else {
let p = Self::blocking_ensure_write_abs_path(&self.root, path)?;
(p, None)
};
- let f = std::fs::OpenOptions::new()
- .create(true)
- .truncate(true)
- .write(true)
+ let mut f = std::fs::OpenOptions::new();
+ f.create(true).write(true);
+
+ if op.append() {
+ f.append(true);
+ } else {
+ f.truncate(true);
+ }
+
+ let f = f
.open(tmp_path.as_ref().unwrap_or(&target_path))
.map_err(parse_io_error)?;
diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs
index 5b1ae953e..d1283d4ca 100644
--- a/core/src/services/fs/writer.rs
+++ b/core/src/services/fs/writer.rs
@@ -45,6 +45,7 @@ impl<F> FsWriter<F> {
Self {
target_path,
tmp_path,
+
f: Some(f),
fut: None,
}
diff --git a/core/tests/behavior/blocking_append.rs
b/core/tests/behavior/blocking_append.rs
new file mode 100644
index 000000000..ccb89f786
--- /dev/null
+++ b/core/tests/behavior/blocking_append.rs
@@ -0,0 +1,220 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::vec;
+
+use anyhow::Result;
+use sha2::Digest;
+use sha2::Sha256;
+use std::io::BufReader;
+use std::io::Cursor;
+
+use crate::*;
+
+pub fn behavior_blocking_append_tests(op: &Operator) -> Vec<Trial> {
+ let cap = op.info().full_capability();
+
+ if !(cap.read && cap.write && cap.blocking && cap.write_can_append) {
+ return vec![];
+ }
+
+ blocking_trials!(
+ op,
+ test_blocking_append_create_append,
+ test_blocking_append_with_dir_path,
+ test_blocking_append_with_cache_control,
+ test_blocking_append_with_content_type,
+ test_blocking_append_with_content_disposition,
+ test_blocking_appender_std_copy,
+ test_blocking_fuzz_appender
+ )
+}
+
+/// Test append to a file must success.
+pub fn test_blocking_append_create_append(op: BlockingOperator) -> Result<()> {
+ let path = uuid::Uuid::new_v4().to_string();
+ let (content_one, size_one) = gen_bytes();
+ let (content_two, size_two) = gen_bytes();
+
+ op.write_with(&path, content_one.clone())
+ .append(true)
+ .call()
+ .expect("append file first time must success");
+
+ op.write_with(&path, content_two.clone())
+ .append(true)
+ .call()
+ .expect("append to an existing file must success");
+
+ let bs = op.read(&path).expect("read file must success");
+
+ assert_eq!(bs.len(), size_one + size_two);
+ assert_eq!(bs[..size_one], content_one);
+ assert_eq!(bs[size_one..], content_two);
+
+ op.delete(&path).expect("delete file must success");
+
+ Ok(())
+}
+
+/// Test append to a directory path must fail.
+pub fn test_blocking_append_with_dir_path(op: BlockingOperator) -> Result<()> {
+ let path = format!("{}/", uuid::Uuid::new_v4());
+ let (content, _) = gen_bytes();
+
+ let res = op.write_with(&path, content).append(true).call();
+ assert!(res.is_err());
+ assert_eq!(res.unwrap_err().kind(), ErrorKind::IsADirectory);
+
+ Ok(())
+}
+
+/// Test append with cache control must success.
+pub fn test_blocking_append_with_cache_control(op: BlockingOperator) ->
Result<()> {
+ if !op.info().full_capability().write_with_cache_control {
+ return Ok(());
+ }
+
+ let path = uuid::Uuid::new_v4().to_string();
+ let (content, _) = gen_bytes();
+
+ let target_cache_control = "no-cache, no-store, max-age=300";
+ op.write_with(&path, content)
+ .append(true)
+ .cache_control(target_cache_control)
+ .call()?;
+
+ let meta = op.stat(&path).expect("stat must succeed");
+ assert_eq!(meta.mode(), EntryMode::FILE);
+ assert_eq!(
+ meta.cache_control().expect("cache control must exist"),
+ target_cache_control
+ );
+
+ op.delete(&path).expect("delete must succeed");
+
+ Ok(())
+}
+
+/// Test append with content type must success.
+pub fn test_blocking_append_with_content_type(op: BlockingOperator) ->
Result<()> {
+ if !op.info().full_capability().write_with_content_type {
+ return Ok(());
+ }
+
+ let path = uuid::Uuid::new_v4().to_string();
+ let (content, size) = gen_bytes();
+
+ let target_content_type = "application/json";
+ op.write_with(&path, content)
+ .append(true)
+ .content_type(target_content_type)
+ .call()?;
+
+ let meta = op.stat(&path).expect("stat must succeed");
+ assert_eq!(meta.mode(), EntryMode::FILE);
+ assert_eq!(
+ meta.content_type().expect("content type must exist"),
+ target_content_type
+ );
+ assert_eq!(meta.content_length(), size as u64);
+
+ op.delete(&path).expect("delete must succeed");
+
+ Ok(())
+}
+
+/// Write a single file with content disposition should succeed.
+pub fn test_blocking_append_with_content_disposition(op: BlockingOperator) ->
Result<()> {
+ if !op.info().full_capability().write_with_content_disposition {
+ return Ok(());
+ }
+
+ let path = uuid::Uuid::new_v4().to_string();
+ let (content, size) = gen_bytes();
+
+ let target_content_disposition = "attachment; filename=\"filename.jpg\"";
+ op.write_with(&path, content)
+ .append(true)
+ .content_disposition(target_content_disposition)
+ .call()?;
+
+ let meta = op.stat(&path).expect("stat must succeed");
+ assert_eq!(meta.mode(), EntryMode::FILE);
+ assert_eq!(
+ meta.content_disposition().expect("content type must exist"),
+ target_content_disposition
+ );
+ assert_eq!(meta.content_length(), size as u64);
+
+ op.delete(&path).expect("delete must succeed");
+
+ Ok(())
+}
+
+/// Copy data from reader to writer
+pub fn test_blocking_appender_std_copy(op: BlockingOperator) -> Result<()> {
+ let path = uuid::Uuid::new_v4().to_string();
+ let (content, size): (Vec<u8>, usize) =
+ gen_bytes_with_range(10 * 1024 * 1024..20 * 1024 * 1024);
+
+ let mut a = op.writer_with(&path).append(true).call()?;
+
+ // Wrap a buf reader here to make sure content is read in 1MiB chunks.
+ let mut cursor = BufReader::with_capacity(1024 * 1024,
Cursor::new(content.clone()));
+ std::io::copy(&mut cursor, &mut a)?;
+ a.close()?;
+
+ let meta = op.stat(&path).expect("stat must succeed");
+ assert_eq!(meta.content_length(), size as u64);
+
+ let bs = op.read(&path)?;
+ assert_eq!(bs.len(), size, "read size");
+ assert_eq!(
+ format!("{:x}", Sha256::digest(&bs[..size])),
+ format!("{:x}", Sha256::digest(content)),
+ "read content"
+ );
+
+ op.delete(&path).expect("delete must succeed");
+ Ok(())
+}
+
+/// Test for fuzzing appender.
+pub fn test_blocking_fuzz_appender(op: BlockingOperator) -> Result<()> {
+ let path = uuid::Uuid::new_v4().to_string();
+
+ let mut fuzzer = ObjectWriterFuzzer::new(&path, None);
+
+ let mut a = op.writer_with(&path).append(true).call()?;
+
+ for _ in 0..100 {
+ match fuzzer.fuzz() {
+ ObjectWriterAction::Write(bs) => {
+ a.write(bs)?;
+ }
+ }
+ }
+ a.close()?;
+
+ let content = op.read(&path)?;
+ fuzzer.check(&content);
+
+ op.delete(&path).expect("delete file must success");
+
+ Ok(())
+}
diff --git a/core/tests/behavior/main.rs b/core/tests/behavior/main.rs
index c46ba4245..188da6365 100644
--- a/core/tests/behavior/main.rs
+++ b/core/tests/behavior/main.rs
@@ -42,12 +42,14 @@ use rename::behavior_rename_tests;
use write::behavior_write_tests;
// Blocking test cases
+mod blocking_append;
mod blocking_copy;
mod blocking_list;
mod blocking_read_only;
mod blocking_rename;
mod blocking_write;
+use blocking_append::behavior_blocking_append_tests;
use blocking_copy::behavior_blocking_copy_tests;
use blocking_list::behavior_blocking_list_tests;
use blocking_read_only::behavior_blocking_read_only_tests;
@@ -73,6 +75,7 @@ fn behavior_test<B: Builder>() -> Vec<Trial> {
let mut trials = vec![];
// Blocking tests
+ trials.extend(behavior_blocking_append_tests(&operator));
trials.extend(behavior_blocking_copy_tests(&operator));
trials.extend(behavior_blocking_list_tests(&operator));
trials.extend(behavior_blocking_read_only_tests(&operator));