This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 581cd9f11 feat(types): remove blocking operation range_read and 
range_reader API  (#2912)
581cd9f11 is described below

commit 581cd9f11a5f3702022fac99b7f3402eccb81b20
Author: oowl <[email protected]>
AuthorDate: Wed Aug 23 18:44:01 2023 +0800

    feat(types): remove blocking operation range_read and range_reader API  
(#2912)
    
    * feat(types): add read_with and reader_with API to blocking operator
    
    * feat(types): remove blocking operator range_read and range_reader API
    
    * feat(types): fix code
---
 core/src/types/operator/blocking_operator.rs  | 116 ++++++++++++++------------
 core/src/types/operator/operator_functions.rs |  40 +++++++++
 core/tests/behavior/blocking_read_only.rs     |   6 +-
 core/tests/behavior/blocking_write.rs         |  16 ++--
 4 files changed, 118 insertions(+), 60 deletions(-)

diff --git a/core/src/types/operator/blocking_operator.rs 
b/core/src/types/operator/blocking_operator.rs
index 0650d2ba8..6d4b102f2 100644
--- a/core/src/types/operator/blocking_operator.rs
+++ b/core/src/types/operator/blocking_operator.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use std::io::Read;
-use std::ops::RangeBounds;
 
 use bytes::Bytes;
 
@@ -230,54 +229,61 @@ impl BlockingOperator {
     /// # }
     /// ```
     pub fn read(&self, path: &str) -> Result<Vec<u8>> {
-        self.range_read(path, ..)
+        self.read_with(path).call()
     }
 
-    /// Read the specified range of path into a bytes.
+    /// Read the whole path into a bytes with extra options.
     ///
     /// This function will allocate a new bytes internally. For more precise 
memory control or
-    /// reading data lazily, please use [`BlockingOperator::range_reader`]
+    /// reading data lazily, please use [`BlockingOperator::reader`]
     ///
     /// # Examples
     ///
     /// ```no_run
-    /// # use std::io::Result;
-    /// # use opendal::BlockingOperator;
-    /// # use futures::TryStreamExt;
-    /// # use opendal::Scheme;
+    /// # use anyhow::Result;
+    /// use opendal::BlockingOperator;
+    /// use opendal::EntryMode;
+    /// use opendal::Metakey;
     /// # fn test(op: BlockingOperator) -> Result<()> {
-    /// let bs = op.range_read("path/to/file", 1024..2048)?;
+    /// let bs = op.read_with("path/to/file").range(0..10).call()?;
     /// # Ok(())
     /// # }
     /// ```
-    pub fn range_read(&self, path: &str, range: impl RangeBounds<u64>) -> 
Result<Vec<u8>> {
+    pub fn read_with(&self, path: &str) -> FunctionRead {
         let path = normalize_path(path);
 
-        if !validate_path(&path, EntryMode::FILE) {
-            return Err(
-                Error::new(ErrorKind::IsADirectory, "read path is a directory")
-                    .with_operation("BlockingOperator::range_read")
-                    .with_context("service", 
self.info().scheme().into_static())
-                    .with_context("path", &path),
-            );
-        }
+        FunctionRead(OperatorFunction::new(
+            self.inner().clone(),
+            path,
+            OpRead::default(),
+            |inner, path, args| {
+                if !validate_path(&path, EntryMode::FILE) {
+                    return Err(
+                        Error::new(ErrorKind::IsADirectory, "read path is a 
directory")
+                            .with_operation("BlockingOperator::read_with")
+                            .with_context("service", 
inner.info().scheme().into_static())
+                            .with_context("path", &path),
+                    );
+                }
 
-        let br = BytesRange::from(range);
-        let (rp, mut s) = self
-            .inner()
-            .blocking_read(&path, OpRead::new().with_range(br))?;
-
-        let mut buffer = 
Vec::with_capacity(rp.into_metadata().content_length() as usize);
-        s.read_to_end(&mut buffer).map_err(|err| {
-            Error::new(ErrorKind::Unexpected, "blocking range read failed")
-                .with_operation("BlockingOperator::range_read")
-                .with_context("service", self.info().scheme().into_static())
-                .with_context("path", path)
-                .with_context("range", br.to_string())
-                .set_source(err)
-        })?;
-
-        Ok(buffer)
+                let (rp, mut s) = inner.blocking_read(&path, args)?;
+                let mut buffer = 
Vec::with_capacity(rp.into_metadata().content_length() as usize);
+
+                match s.read_to_end(&mut buffer) {
+                    Ok(n) => {
+                        buffer.truncate(n);
+                        Ok(buffer)
+                    }
+                    Err(err) => Err(
+                        Error::new(ErrorKind::Unexpected, "blocking read_with 
failed")
+                            .with_operation("BlockingOperator::read_with")
+                            .with_context("service", 
inner.info().scheme().into_static())
+                            .with_context("path", &path)
+                            .set_source(err),
+                    ),
+                }
+            },
+        ))
     }
 
     /// Create a new reader which can read the whole path.
@@ -294,37 +300,43 @@ impl BlockingOperator {
     /// # }
     /// ```
     pub fn reader(&self, path: &str) -> Result<BlockingReader> {
-        self.range_reader(path, ..)
+        self.reader_with(path).call()
     }
 
-    /// Create a new reader which can read the specified range.
+    /// Create a new reader with extra options
     ///
     /// # Examples
     ///
     /// ```no_run
-    /// # use std::io::Result;
-    /// # use opendal::BlockingOperator;
-    /// # use futures::TryStreamExt;
+    /// # use anyhow::Result;
+    /// use opendal::BlockingOperator;
+    /// use opendal::EntryMode;
+    /// use opendal::Metakey;
     /// # fn test(op: BlockingOperator) -> Result<()> {
-    /// let r = op.range_reader("path/to/file", 1024..2048)?;
+    /// let r = op.reader_with("path/to/file").range(0..10).call()?;
     /// # Ok(())
     /// # }
     /// ```
-    pub fn range_reader(&self, path: &str, range: impl RangeBounds<u64>) -> 
Result<BlockingReader> {
+    pub fn reader_with(&self, path: &str) -> FunctionReader {
         let path = normalize_path(path);
 
-        if !validate_path(&path, EntryMode::FILE) {
-            return Err(
-                Error::new(ErrorKind::IsADirectory, "read path is a directory")
-                    .with_operation("BlockingOperator::range_reader")
-                    .with_context("service", 
self.info().scheme().into_static())
-                    .with_context("path", &path),
-            );
-        }
-
-        let op = OpRead::new().with_range(range.into());
+        FunctionReader(OperatorFunction::new(
+            self.inner().clone(),
+            path,
+            OpRead::default(),
+            |inner, path, args| {
+                if !validate_path(&path, EntryMode::FILE) {
+                    return Err(
+                        Error::new(ErrorKind::IsADirectory, "reader path is a 
directory")
+                            .with_operation("BlockingOperator::reader_with")
+                            .with_context("service", 
inner.info().scheme().into_static())
+                            .with_context("path", &path),
+                    );
+                }
 
-        BlockingReader::create(self.inner().clone(), &path, op)
+                BlockingReader::create(inner.clone(), &path, args)
+            },
+        ))
     }
 
     /// Write bytes into given path.
diff --git a/core/src/types/operator/operator_functions.rs 
b/core/src/types/operator/operator_functions.rs
index 2164bcafb..1000b7094 100644
--- a/core/src/types/operator/operator_functions.rs
+++ b/core/src/types/operator/operator_functions.rs
@@ -19,6 +19,8 @@
 //!
 //! By using functions, users can add more options for operation.
 
+use std::ops::RangeBounds;
+
 use bytes::Bytes;
 use flagset::FlagSet;
 
@@ -189,3 +191,41 @@ impl FunctionLister {
         self.0.call()
     }
 }
+
+/// Function that generated by [`BlockingOperator::read_with`].
+///
+/// Users can add more options by public functions provided by this struct.
+pub struct FunctionRead(pub(crate) OperatorFunction<OpRead, Vec<u8>>);
+
+impl FunctionRead {
+    /// Set the range for this operation.
+    pub fn range(mut self, range: impl RangeBounds<u64>) -> Self {
+        self.0 = self.0.map_args(|args| args.with_range(range.into()));
+        self
+    }
+
+    /// Call the function to consume all the input and generate a
+    /// result.
+    pub fn call(self) -> Result<Vec<u8>> {
+        self.0.call()
+    }
+}
+
+/// Function that generated by [`BlockingOperator::reader_with`].
+///
+/// Users can add more options by public functions provided by this struct.
+pub struct FunctionReader(pub(crate) OperatorFunction<OpRead, BlockingReader>);
+
+impl FunctionReader {
+    /// Set the range for this operation.
+    pub fn range(mut self, range: impl RangeBounds<u64>) -> Self {
+        self.0 = self.0.map_args(|args| args.with_range(range.into()));
+        self
+    }
+
+    /// Call the function to consume all the input and generate a
+    /// result.
+    pub fn call(self) -> Result<BlockingReader> {
+        self.0.call()
+    }
+}
diff --git a/core/tests/behavior/blocking_read_only.rs 
b/core/tests/behavior/blocking_read_only.rs
index 6c4f5e0ab..f351d13bf 100644
--- a/core/tests/behavior/blocking_read_only.rs
+++ b/core/tests/behavior/blocking_read_only.rs
@@ -34,7 +34,7 @@ pub fn behavior_blocking_read_only_tests(op: &Operator) -> 
Vec<Trial> {
         test_blocking_read_only_stat_special_chars,
         test_blocking_read_only_stat_not_exist,
         test_blocking_read_only_read_full,
-        test_blocking_read_only_read_range,
+        test_blocking_read_only_read_with_range,
         test_blocking_read_only_read_not_exist
     )
 }
@@ -88,8 +88,8 @@ pub fn test_blocking_read_only_read_full(op: 
BlockingOperator) -> Result<()> {
 }
 
 /// Read full content should match.
-pub fn test_blocking_read_only_read_range(op: BlockingOperator) -> Result<()> {
-    let bs = op.range_read("normal_file", 1024..2048)?;
+pub fn test_blocking_read_only_read_with_range(op: BlockingOperator) -> 
Result<()> {
+    let bs = op.read_with("normal_file").range(1024..2048).call()?;
     assert_eq!(bs.len(), 1024, "read size");
     assert_eq!(
         format!("{:x}", Sha256::digest(&bs)),
diff --git a/core/tests/behavior/blocking_write.rs 
b/core/tests/behavior/blocking_write.rs
index 06d44a1b9..eb6932342 100644
--- a/core/tests/behavior/blocking_write.rs
+++ b/core/tests/behavior/blocking_write.rs
@@ -229,7 +229,7 @@ pub fn test_blocking_read_range(op: BlockingOperator) -> 
Result<()> {
     op.write(&path, content.clone())
         .expect("write must succeed");
 
-    let bs = op.range_read(&path, offset..offset + length)?;
+    let bs = op.read_with(&path).range(offset..offset + length).call()?;
     assert_eq!(bs.len() as u64, length, "read size");
     assert_eq!(
         format!("{:x}", Sha256::digest(&bs)),
@@ -258,7 +258,7 @@ pub fn test_blocking_read_large_range(op: BlockingOperator) 
-> Result<()> {
     op.write(&path, content.clone())
         .expect("write must succeed");
 
-    let bs = op.range_read(&path, offset..u32::MAX as u64)?;
+    let bs = op.read_with(&path).range(offset..u32::MAX as u64).call()?;
     assert_eq!(
         bs.len() as u64,
         size as u64 - offset,
@@ -298,7 +298,10 @@ pub fn test_blocking_fuzz_range_reader(op: 
BlockingOperator) -> Result<()> {
         .expect("write must succeed");
 
     let mut fuzzer = ObjectReaderFuzzer::new(&path, content.clone(), 0, 
content.len());
-    let mut o = op.range_reader(&path, 0..content.len() as u64)?;
+    let mut o = op
+        .reader_with(&path)
+        .range(0..content.len() as u64)
+        .call()?;
 
     for _ in 0..100 {
         match fuzzer.fuzz() {
@@ -335,7 +338,7 @@ pub fn test_blocking_fuzz_offset_reader(op: 
BlockingOperator) -> Result<()> {
         .expect("write must succeed");
 
     let mut fuzzer = ObjectReaderFuzzer::new(&path, content.clone(), 0, 
content.len());
-    let mut o = op.range_reader(&path, 0..)?;
+    let mut o = op.reader_with(&path).range(0..).call()?;
 
     for _ in 0..100 {
         match fuzzer.fuzz() {
@@ -373,7 +376,10 @@ pub fn test_blocking_fuzz_part_reader(op: 
BlockingOperator) -> Result<()> {
         .expect("write must succeed");
 
     let mut fuzzer = ObjectReaderFuzzer::new(&path, content, offset as usize, 
length as usize);
-    let mut o = op.range_reader(&path, offset..offset + length)?;
+    let mut o = op
+        .reader_with(&path)
+        .range(offset..offset + length)
+        .call()?;
 
     for _ in 0..100 {
         match fuzzer.fuzz() {

Reply via email to