This is an automated email from the ASF dual-hosted git repository. junouyang pushed a commit to branch feat/blockong-operator-range-read in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit fbea947632bb199efc6e8500447b1f88ecc364b4 Author: owl <[email protected]> AuthorDate: Wed Aug 23 17:34:40 2023 +0800 feat(types): add read_with and reader_with API to blocking operator --- core/src/types/operator/blocking_operator.rs | 93 ++++++++++++++++++++++++++- core/src/types/operator/operator_functions.rs | 40 ++++++++++++ 2 files changed, 132 insertions(+), 1 deletion(-) diff --git a/core/src/types/operator/blocking_operator.rs b/core/src/types/operator/blocking_operator.rs index 0650d2ba8..012a27df4 100644 --- a/core/src/types/operator/blocking_operator.rs +++ b/core/src/types/operator/blocking_operator.rs @@ -280,6 +280,61 @@ impl BlockingOperator { Ok(buffer) } + /// Read the whole path into a bytes with extra options. + /// + /// This function will allocate a new bytes internally. For more precise memory control or + /// reading data lazily, please use [`BlockingOperator::reader`] + /// + /// # Examples + /// + /// ```no_run + /// # use anyhow::Result; + /// use opendal::BlockingOperator; + /// use opendal::EntryMode; + /// use opendal::Metakey; + /// # fn test(op: BlockingOperator) -> Result<()> { + /// let bs = op.read_with("path/to/file").range(0..10).call()?; + /// # Ok(()) + /// # } + /// ``` + pub fn read_with(&self, path: &str) -> FunctionRead { + let path = normalize_path(path); + + FunctionRead(OperatorFunction::new( + self.inner().clone(), + path, + OpRead::default(), + |inner, path, args| { + if !validate_path(&path, EntryMode::FILE) { + return Err( + Error::new(ErrorKind::IsADirectory, "read path is a directory") + .with_operation("BlockingOperator::read_with") + .with_context("service", inner.info().scheme().into_static()) + .with_context("path", &path) + ); + } + + let (rp, mut s) = inner.blocking_read(&path, args)?; + let mut buffer = Vec::with_capacity(rp.into_metadata().content_length() as usize); + + match s.read_to_end(&mut buffer) { + Ok(n) => { + buffer.truncate(n); + Ok(buffer) + }, + Err(err) => Err( + Error::new(ErrorKind::Unexpected, "blocking read_with failed") + .with_operation("BlockingOperator::read_with") + .with_context("service", inner.info().scheme().into_static()) + .with_context("path", &path) + .set_source(err) + ), + } + }, + )) + } + + /// Create a new reader which can read the whole path. /// /// # Examples @@ -327,6 +382,42 @@ impl BlockingOperator { BlockingReader::create(self.inner().clone(), &path, op) } + /// Create a new reader with extra options + /// + /// # Examples + /// + /// ```no_run + /// # use anyhow::Result; + /// use opendal::BlockingOperator; + /// use opendal::EntryMode; + /// use opendal::Metakey; + /// # fn test(op: BlockingOperator) -> Result<()> { + /// let r = op.reader_with("path/to/file").range(0..10).call()?; + /// # Ok(()) + /// # } + /// ``` + pub fn reader_with(&self, path: &str) -> FunctionReader { + let path = normalize_path(path); + + FunctionReader(OperatorFunction::new( + self.inner().clone(), + path, + OpRead::default(), + |inner, path, args| { + if !validate_path(&path, EntryMode::FILE) { + return Err( + Error::new(ErrorKind::IsADirectory, "reader path is a directory") + .with_operation("BlockingOperator::reader_with") + .with_context("service", inner.info().scheme().into_static()) + .with_context("path", &path) + ); + } + + BlockingReader::create(inner.clone(), &path, args) + } + )) + } + /// Write bytes into given path. /// /// # Notes @@ -1036,4 +1127,4 @@ impl BlockingOperator { }, )) } -} +} \ No newline at end of file diff --git a/core/src/types/operator/operator_functions.rs b/core/src/types/operator/operator_functions.rs index 2164bcafb..1000b7094 100644 --- a/core/src/types/operator/operator_functions.rs +++ b/core/src/types/operator/operator_functions.rs @@ -19,6 +19,8 @@ //! //! By using functions, users can add more options for operation. +use std::ops::RangeBounds; + use bytes::Bytes; use flagset::FlagSet; @@ -189,3 +191,41 @@ impl FunctionLister { self.0.call() } } + +/// Function that generated by [`BlockingOperator::read_with`]. +/// +/// Users can add more options by public functions provided by this struct. +pub struct FunctionRead(pub(crate) OperatorFunction<OpRead, Vec<u8>>); + +impl FunctionRead { + /// Set the range for this operation. + pub fn range(mut self, range: impl RangeBounds<u64>) -> Self { + self.0 = self.0.map_args(|args| args.with_range(range.into())); + self + } + + /// Call the function to consume all the input and generate a + /// result. + pub fn call(self) -> Result<Vec<u8>> { + self.0.call() + } +} + +/// Function that generated by [`BlockingOperator::reader_with`]. +/// +/// Users can add more options by public functions provided by this struct. +pub struct FunctionReader(pub(crate) OperatorFunction<OpRead, BlockingReader>); + +impl FunctionReader { + /// Set the range for this operation. + pub fn range(mut self, range: impl RangeBounds<u64>) -> Self { + self.0 = self.0.map_args(|args| args.with_range(range.into())); + self + } + + /// Call the function to consume all the input and generate a + /// result. + pub fn call(self) -> Result<BlockingReader> { + self.0.call() + } +}
