This is an automated email from the ASF dual-hosted git repository.

junouyang pushed a commit to branch feat/blockong-operator-range-read
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit fbea947632bb199efc6e8500447b1f88ecc364b4
Author: owl <[email protected]>
AuthorDate: Wed Aug 23 17:34:40 2023 +0800

    feat(types): add read_with and reader_with API to blocking operator
---
 core/src/types/operator/blocking_operator.rs  | 93 ++++++++++++++++++++++++++-
 core/src/types/operator/operator_functions.rs | 40 ++++++++++++
 2 files changed, 132 insertions(+), 1 deletion(-)

diff --git a/core/src/types/operator/blocking_operator.rs 
b/core/src/types/operator/blocking_operator.rs
index 0650d2ba8..012a27df4 100644
--- a/core/src/types/operator/blocking_operator.rs
+++ b/core/src/types/operator/blocking_operator.rs
@@ -280,6 +280,61 @@ impl BlockingOperator {
         Ok(buffer)
     }
 
+    /// Read the whole path into a bytes with extra options.
+    ///
+    /// This function will allocate a new bytes internally. For more precise 
memory control or
+    /// reading data lazily, please use [`BlockingOperator::reader`]
+    ///
+    /// # Examples
+    /// 
+    /// ```no_run
+    /// # use anyhow::Result;
+    /// use opendal::BlockingOperator;
+    /// use opendal::EntryMode;
+    /// use opendal::Metakey;
+    /// # fn test(op: BlockingOperator) -> Result<()> {
+    /// let bs = op.read_with("path/to/file").range(0..10).call()?;
+    /// # Ok(())
+    /// # }
+    /// ```
+    pub fn read_with(&self, path: &str) -> FunctionRead {
+        let path = normalize_path(path);
+
+        FunctionRead(OperatorFunction::new(
+            self.inner().clone(),
+            path,
+            OpRead::default(),
+            |inner, path, args| {
+                if !validate_path(&path, EntryMode::FILE) {
+                    return Err(
+                        Error::new(ErrorKind::IsADirectory, "read path is a 
directory")
+                            .with_operation("BlockingOperator::read_with")
+                            .with_context("service", 
inner.info().scheme().into_static())
+                            .with_context("path", &path)
+                    );
+                }
+
+                let (rp, mut s) = inner.blocking_read(&path, args)?;
+                let mut buffer = 
Vec::with_capacity(rp.into_metadata().content_length() as usize);
+                
+                match s.read_to_end(&mut buffer) {
+                    Ok(n) => {
+                        buffer.truncate(n);
+                        Ok(buffer)
+                    },
+                    Err(err) => Err(
+                        Error::new(ErrorKind::Unexpected, "blocking read_with 
failed")
+                        .with_operation("BlockingOperator::read_with")
+                        .with_context("service", 
inner.info().scheme().into_static())
+                        .with_context("path", &path)
+                        .set_source(err)
+                    ),
+                }
+            },
+        ))
+    }
+
+
     /// Create a new reader which can read the whole path.
     ///
     /// # Examples
@@ -327,6 +382,42 @@ impl BlockingOperator {
         BlockingReader::create(self.inner().clone(), &path, op)
     }
 
+    /// Create a new reader with extra options
+    ///
+    /// # Examples
+    /// 
+    /// ```no_run
+    /// # use anyhow::Result;
+    /// use opendal::BlockingOperator;
+    /// use opendal::EntryMode;
+    /// use opendal::Metakey;
+    /// # fn test(op: BlockingOperator) -> Result<()> {
+    /// let r = op.reader_with("path/to/file").range(0..10).call()?;
+    /// # Ok(())
+    /// # }
+    /// ```
+    pub fn reader_with(&self, path: &str) -> FunctionReader {
+        let path = normalize_path(path);
+
+        FunctionReader(OperatorFunction::new(
+            self.inner().clone(),
+            path,
+            OpRead::default(),
+            |inner, path, args| {
+                if !validate_path(&path, EntryMode::FILE) {
+                    return Err(
+                        Error::new(ErrorKind::IsADirectory, "reader path is a 
directory")
+                            .with_operation("BlockingOperator::reader_with")
+                            .with_context("service", 
inner.info().scheme().into_static())
+                            .with_context("path", &path)
+                    );
+                }
+
+                BlockingReader::create(inner.clone(), &path, args)
+            }
+        ))
+    }
+
     /// Write bytes into given path.
     ///
     /// # Notes
@@ -1036,4 +1127,4 @@ impl BlockingOperator {
             },
         ))
     }
-}
+}
\ No newline at end of file
diff --git a/core/src/types/operator/operator_functions.rs 
b/core/src/types/operator/operator_functions.rs
index 2164bcafb..1000b7094 100644
--- a/core/src/types/operator/operator_functions.rs
+++ b/core/src/types/operator/operator_functions.rs
@@ -19,6 +19,8 @@
 //!
 //! By using functions, users can add more options for operation.
 
+use std::ops::RangeBounds;
+
 use bytes::Bytes;
 use flagset::FlagSet;
 
@@ -189,3 +191,41 @@ impl FunctionLister {
         self.0.call()
     }
 }
+
+/// Function that generated by [`BlockingOperator::read_with`].
+///
+/// Users can add more options by public functions provided by this struct.
+pub struct FunctionRead(pub(crate) OperatorFunction<OpRead, Vec<u8>>);
+
+impl FunctionRead {
+    /// Set the range for this operation.
+    pub fn range(mut self, range: impl RangeBounds<u64>) -> Self {
+        self.0 = self.0.map_args(|args| args.with_range(range.into()));
+        self
+    }
+
+    /// Call the function to consume all the input and generate a
+    /// result.
+    pub fn call(self) -> Result<Vec<u8>> {
+        self.0.call()
+    }
+}
+
+/// Function that generated by [`BlockingOperator::reader_with`].
+///
+/// Users can add more options by public functions provided by this struct.
+pub struct FunctionReader(pub(crate) OperatorFunction<OpRead, BlockingReader>);
+
+impl FunctionReader {
+    /// Set the range for this operation.
+    pub fn range(mut self, range: impl RangeBounds<u64>) -> Self {
+        self.0 = self.0.map_args(|args| args.with_range(range.into()));
+        self
+    }
+
+    /// Call the function to consume all the input and generate a
+    /// result.
+    pub fn call(self) -> Result<BlockingReader> {
+        self.0.call()
+    }
+}

Reply via email to