This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 581cd9f11 feat(types): remove blocking operation range_read and
range_reader API (#2912)
581cd9f11 is described below
commit 581cd9f11a5f3702022fac99b7f3402eccb81b20
Author: oowl <[email protected]>
AuthorDate: Wed Aug 23 18:44:01 2023 +0800
feat(types): remove blocking operation range_read and range_reader API
(#2912)
* feat(types): add read_with and reader_with API to blocking operator
* feat(types): remove blocking operator range_read and range_reader API
* feat(types): fix code
---
core/src/types/operator/blocking_operator.rs | 116 ++++++++++++++------------
core/src/types/operator/operator_functions.rs | 40 +++++++++
core/tests/behavior/blocking_read_only.rs | 6 +-
core/tests/behavior/blocking_write.rs | 16 ++--
4 files changed, 118 insertions(+), 60 deletions(-)
diff --git a/core/src/types/operator/blocking_operator.rs
b/core/src/types/operator/blocking_operator.rs
index 0650d2ba8..6d4b102f2 100644
--- a/core/src/types/operator/blocking_operator.rs
+++ b/core/src/types/operator/blocking_operator.rs
@@ -16,7 +16,6 @@
// under the License.
use std::io::Read;
-use std::ops::RangeBounds;
use bytes::Bytes;
@@ -230,54 +229,61 @@ impl BlockingOperator {
/// # }
/// ```
pub fn read(&self, path: &str) -> Result<Vec<u8>> {
- self.range_read(path, ..)
+ self.read_with(path).call()
}
- /// Read the specified range of path into a bytes.
+ /// Read the whole path into a bytes with extra options.
///
/// This function will allocate a new bytes internally. For more precise
memory control or
- /// reading data lazily, please use [`BlockingOperator::range_reader`]
+ /// reading data lazily, please use [`BlockingOperator::reader`]
///
/// # Examples
///
/// ```no_run
- /// # use std::io::Result;
- /// # use opendal::BlockingOperator;
- /// # use futures::TryStreamExt;
- /// # use opendal::Scheme;
+ /// # use anyhow::Result;
+ /// use opendal::BlockingOperator;
+ /// use opendal::EntryMode;
+ /// use opendal::Metakey;
/// # fn test(op: BlockingOperator) -> Result<()> {
- /// let bs = op.range_read("path/to/file", 1024..2048)?;
+ /// let bs = op.read_with("path/to/file").range(0..10).call()?;
/// # Ok(())
/// # }
/// ```
- pub fn range_read(&self, path: &str, range: impl RangeBounds<u64>) ->
Result<Vec<u8>> {
+ pub fn read_with(&self, path: &str) -> FunctionRead {
let path = normalize_path(path);
- if !validate_path(&path, EntryMode::FILE) {
- return Err(
- Error::new(ErrorKind::IsADirectory, "read path is a directory")
- .with_operation("BlockingOperator::range_read")
- .with_context("service",
self.info().scheme().into_static())
- .with_context("path", &path),
- );
- }
+ FunctionRead(OperatorFunction::new(
+ self.inner().clone(),
+ path,
+ OpRead::default(),
+ |inner, path, args| {
+ if !validate_path(&path, EntryMode::FILE) {
+ return Err(
+ Error::new(ErrorKind::IsADirectory, "read path is a
directory")
+ .with_operation("BlockingOperator::read_with")
+ .with_context("service",
inner.info().scheme().into_static())
+ .with_context("path", &path),
+ );
+ }
- let br = BytesRange::from(range);
- let (rp, mut s) = self
- .inner()
- .blocking_read(&path, OpRead::new().with_range(br))?;
-
- let mut buffer =
Vec::with_capacity(rp.into_metadata().content_length() as usize);
- s.read_to_end(&mut buffer).map_err(|err| {
- Error::new(ErrorKind::Unexpected, "blocking range read failed")
- .with_operation("BlockingOperator::range_read")
- .with_context("service", self.info().scheme().into_static())
- .with_context("path", path)
- .with_context("range", br.to_string())
- .set_source(err)
- })?;
-
- Ok(buffer)
+ let (rp, mut s) = inner.blocking_read(&path, args)?;
+ let mut buffer =
Vec::with_capacity(rp.into_metadata().content_length() as usize);
+
+ match s.read_to_end(&mut buffer) {
+ Ok(n) => {
+ buffer.truncate(n);
+ Ok(buffer)
+ }
+ Err(err) => Err(
+ Error::new(ErrorKind::Unexpected, "blocking read_with
failed")
+ .with_operation("BlockingOperator::read_with")
+ .with_context("service",
inner.info().scheme().into_static())
+ .with_context("path", &path)
+ .set_source(err),
+ ),
+ }
+ },
+ ))
}
/// Create a new reader which can read the whole path.
@@ -294,37 +300,43 @@ impl BlockingOperator {
/// # }
/// ```
pub fn reader(&self, path: &str) -> Result<BlockingReader> {
- self.range_reader(path, ..)
+ self.reader_with(path).call()
}
- /// Create a new reader which can read the specified range.
+ /// Create a new reader with extra options
///
/// # Examples
///
/// ```no_run
- /// # use std::io::Result;
- /// # use opendal::BlockingOperator;
- /// # use futures::TryStreamExt;
+ /// # use anyhow::Result;
+ /// use opendal::BlockingOperator;
+ /// use opendal::EntryMode;
+ /// use opendal::Metakey;
/// # fn test(op: BlockingOperator) -> Result<()> {
- /// let r = op.range_reader("path/to/file", 1024..2048)?;
+ /// let r = op.reader_with("path/to/file").range(0..10).call()?;
/// # Ok(())
/// # }
/// ```
- pub fn range_reader(&self, path: &str, range: impl RangeBounds<u64>) ->
Result<BlockingReader> {
+ pub fn reader_with(&self, path: &str) -> FunctionReader {
let path = normalize_path(path);
- if !validate_path(&path, EntryMode::FILE) {
- return Err(
- Error::new(ErrorKind::IsADirectory, "read path is a directory")
- .with_operation("BlockingOperator::range_reader")
- .with_context("service",
self.info().scheme().into_static())
- .with_context("path", &path),
- );
- }
-
- let op = OpRead::new().with_range(range.into());
+ FunctionReader(OperatorFunction::new(
+ self.inner().clone(),
+ path,
+ OpRead::default(),
+ |inner, path, args| {
+ if !validate_path(&path, EntryMode::FILE) {
+ return Err(
+ Error::new(ErrorKind::IsADirectory, "reader path is a
directory")
+ .with_operation("BlockingOperator::reader_with")
+ .with_context("service",
inner.info().scheme().into_static())
+ .with_context("path", &path),
+ );
+ }
- BlockingReader::create(self.inner().clone(), &path, op)
+ BlockingReader::create(inner.clone(), &path, args)
+ },
+ ))
}
/// Write bytes into given path.
diff --git a/core/src/types/operator/operator_functions.rs
b/core/src/types/operator/operator_functions.rs
index 2164bcafb..1000b7094 100644
--- a/core/src/types/operator/operator_functions.rs
+++ b/core/src/types/operator/operator_functions.rs
@@ -19,6 +19,8 @@
//!
//! By using functions, users can add more options for operation.
+use std::ops::RangeBounds;
+
use bytes::Bytes;
use flagset::FlagSet;
@@ -189,3 +191,41 @@ impl FunctionLister {
self.0.call()
}
}
+
+/// Function that generated by [`BlockingOperator::read_with`].
+///
+/// Users can add more options by public functions provided by this struct.
+pub struct FunctionRead(pub(crate) OperatorFunction<OpRead, Vec<u8>>);
+
+impl FunctionRead {
+ /// Set the range for this operation.
+ pub fn range(mut self, range: impl RangeBounds<u64>) -> Self {
+ self.0 = self.0.map_args(|args| args.with_range(range.into()));
+ self
+ }
+
+ /// Call the function to consume all the input and generate a
+ /// result.
+ pub fn call(self) -> Result<Vec<u8>> {
+ self.0.call()
+ }
+}
+
+/// Function that generated by [`BlockingOperator::reader_with`].
+///
+/// Users can add more options by public functions provided by this struct.
+pub struct FunctionReader(pub(crate) OperatorFunction<OpRead, BlockingReader>);
+
+impl FunctionReader {
+ /// Set the range for this operation.
+ pub fn range(mut self, range: impl RangeBounds<u64>) -> Self {
+ self.0 = self.0.map_args(|args| args.with_range(range.into()));
+ self
+ }
+
+ /// Call the function to consume all the input and generate a
+ /// result.
+ pub fn call(self) -> Result<BlockingReader> {
+ self.0.call()
+ }
+}
diff --git a/core/tests/behavior/blocking_read_only.rs
b/core/tests/behavior/blocking_read_only.rs
index 6c4f5e0ab..f351d13bf 100644
--- a/core/tests/behavior/blocking_read_only.rs
+++ b/core/tests/behavior/blocking_read_only.rs
@@ -34,7 +34,7 @@ pub fn behavior_blocking_read_only_tests(op: &Operator) ->
Vec<Trial> {
test_blocking_read_only_stat_special_chars,
test_blocking_read_only_stat_not_exist,
test_blocking_read_only_read_full,
- test_blocking_read_only_read_range,
+ test_blocking_read_only_read_with_range,
test_blocking_read_only_read_not_exist
)
}
@@ -88,8 +88,8 @@ pub fn test_blocking_read_only_read_full(op:
BlockingOperator) -> Result<()> {
}
/// Read full content should match.
-pub fn test_blocking_read_only_read_range(op: BlockingOperator) -> Result<()> {
- let bs = op.range_read("normal_file", 1024..2048)?;
+pub fn test_blocking_read_only_read_with_range(op: BlockingOperator) ->
Result<()> {
+ let bs = op.read_with("normal_file").range(1024..2048).call()?;
assert_eq!(bs.len(), 1024, "read size");
assert_eq!(
format!("{:x}", Sha256::digest(&bs)),
diff --git a/core/tests/behavior/blocking_write.rs
b/core/tests/behavior/blocking_write.rs
index 06d44a1b9..eb6932342 100644
--- a/core/tests/behavior/blocking_write.rs
+++ b/core/tests/behavior/blocking_write.rs
@@ -229,7 +229,7 @@ pub fn test_blocking_read_range(op: BlockingOperator) ->
Result<()> {
op.write(&path, content.clone())
.expect("write must succeed");
- let bs = op.range_read(&path, offset..offset + length)?;
+ let bs = op.read_with(&path).range(offset..offset + length).call()?;
assert_eq!(bs.len() as u64, length, "read size");
assert_eq!(
format!("{:x}", Sha256::digest(&bs)),
@@ -258,7 +258,7 @@ pub fn test_blocking_read_large_range(op: BlockingOperator)
-> Result<()> {
op.write(&path, content.clone())
.expect("write must succeed");
- let bs = op.range_read(&path, offset..u32::MAX as u64)?;
+ let bs = op.read_with(&path).range(offset..u32::MAX as u64).call()?;
assert_eq!(
bs.len() as u64,
size as u64 - offset,
@@ -298,7 +298,10 @@ pub fn test_blocking_fuzz_range_reader(op:
BlockingOperator) -> Result<()> {
.expect("write must succeed");
let mut fuzzer = ObjectReaderFuzzer::new(&path, content.clone(), 0,
content.len());
- let mut o = op.range_reader(&path, 0..content.len() as u64)?;
+ let mut o = op
+ .reader_with(&path)
+ .range(0..content.len() as u64)
+ .call()?;
for _ in 0..100 {
match fuzzer.fuzz() {
@@ -335,7 +338,7 @@ pub fn test_blocking_fuzz_offset_reader(op:
BlockingOperator) -> Result<()> {
.expect("write must succeed");
let mut fuzzer = ObjectReaderFuzzer::new(&path, content.clone(), 0,
content.len());
- let mut o = op.range_reader(&path, 0..)?;
+ let mut o = op.reader_with(&path).range(0..).call()?;
for _ in 0..100 {
match fuzzer.fuzz() {
@@ -373,7 +376,10 @@ pub fn test_blocking_fuzz_part_reader(op:
BlockingOperator) -> Result<()> {
.expect("write must succeed");
let mut fuzzer = ObjectReaderFuzzer::new(&path, content, offset as usize,
length as usize);
- let mut o = op.range_reader(&path, offset..offset + length)?;
+ let mut o = op
+ .reader_with(&path)
+ .range(offset..offset + length)
+ .call()?;
for _ in 0..100 {
match fuzzer.fuzz() {