This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch list-with-metakey-for-blocking in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 5484b900441cbf4eac2ddb224dc998104c8d36b1 Author: Xuanwo <[email protected]> AuthorDate: Mon Aug 14 14:03:54 2023 +0800 Save work Signed-off-by: Xuanwo <[email protected]> --- bindings/c/src/lib.rs | 2 +- bindings/haskell/src/lib.rs | 4 +- bindings/nodejs/src/lib.rs | 14 +- bindings/python/src/lib.rs | 10 +- core/src/types/operator/blocking_operator.rs | 196 +++++++++++++++++--------- core/src/types/operator/operator_functions.rs | 38 +++++ core/tests/behavior/blocking_list.rs | 9 +- 7 files changed, 198 insertions(+), 75 deletions(-) diff --git a/bindings/c/src/lib.rs b/bindings/c/src/lib.rs index 6e5130265..6535b7965 100644 --- a/bindings/c/src/lib.rs +++ b/bindings/c/src/lib.rs @@ -485,7 +485,7 @@ pub unsafe extern "C" fn opendal_operator_blocking_list( let op = (*ptr).as_ref(); let path = unsafe { std::ffi::CStr::from_ptr(path).to_str().unwrap() }; - match op.list(path) { + match op.lister(path) { Ok(lister) => opendal_result_list { lister: Box::into_raw(Box::new(opendal_blocking_lister::new(lister))), code: opendal_code::OPENDAL_OK, diff --git a/bindings/haskell/src/lib.rs b/bindings/haskell/src/lib.rs index b0aaf3d56..3b0233388 100644 --- a/bindings/haskell/src/lib.rs +++ b/bindings/haskell/src/lib.rs @@ -501,7 +501,7 @@ pub unsafe extern "C" fn blocking_list( } }; - let res = match op.list(path_str) { + let res = match op.lister(path_str) { Ok(lister) => FFIResult::ok(Box::into_raw(Box::new(lister))), Err(e) => FFIResult::err_with_source("Failed to list", e), }; @@ -540,7 +540,7 @@ pub unsafe extern "C" fn blocking_scan( } }; - let res = match op.scan(path_str) { + let res = match op.lister_with(path_str).delimiter("").call() { Ok(lister) => FFIResult::ok(Box::into_raw(Box::new(lister))), Err(e) => FFIResult::err_with_source("Failed to scan", e), }; diff --git a/bindings/nodejs/src/lib.rs b/bindings/nodejs/src/lib.rs index dbab75648..b615bc7e9 100644 --- a/bindings/nodejs/src/lib.rs +++ b/bindings/nodejs/src/lib.rs @@ -336,7 +336,12 @@ impl Operator { #[napi] pub fn scan_sync(&self, path: String) -> Result<BlockingLister> { Ok(BlockingLister( - self.0.blocking().scan(&path).map_err(format_napi_error)?, + self.0 + .blocking() + .lister_with(&path) + .delimiter("") + .call() + .map_err(format_napi_error)?, )) } @@ -443,7 +448,12 @@ impl Operator { #[napi] pub fn list_sync(&self, path: String) -> Result<BlockingLister> { Ok(BlockingLister( - self.0.blocking().scan(&path).map_err(format_napi_error)?, + self.0 + .blocking() + .lister_with(&path) + .delimiter("") + .call() + .map_err(format_napi_error)?, )) } diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs index 7d58d2773..81189b3b0 100644 --- a/bindings/python/src/lib.rs +++ b/bindings/python/src/lib.rs @@ -146,12 +146,18 @@ impl Operator { /// List current dir path. pub fn list(&self, path: &str) -> PyResult<BlockingLister> { - Ok(BlockingLister(self.0.list(path).map_err(format_pyerr)?)) + Ok(BlockingLister(self.0.lister(path).map_err(format_pyerr)?)) } /// List dir in flat way. pub fn scan(&self, path: &str) -> PyResult<BlockingLister> { - Ok(BlockingLister(self.0.scan(path).map_err(format_pyerr)?)) + Ok(BlockingLister( + self.0 + .lister_with(path) + .delimiter("") + .call() + .map_err(format_pyerr)?, + )) } fn __repr__(&self) -> String { diff --git a/core/src/types/operator/blocking_operator.rs b/core/src/types/operator/blocking_operator.rs index 5f8c6d279..143592762 100644 --- a/core/src/types/operator/blocking_operator.rs +++ b/core/src/types/operator/blocking_operator.rs @@ -113,18 +113,13 @@ impl BlockingOperator { /// # Operator blocking API. impl BlockingOperator { - /// Get current path's metadata **without cache** directly. + /// Get current path's metadata. /// /// # Notes /// - /// Use `stat` if you: - /// - /// - Want detect the outside changes of path. - /// - Don't want to read from cached metadata. - /// - /// You may want to use `metadata` if you are working with entries - /// returned by [`Lister`]. It's highly possible that metadata - /// you want has already been cached. + /// For fetch metadata of entries returned by [`Lister`], it's better to use [`list_with`] and + /// [`lister_with`] with `metakey` query like `Metakey::ContentLength | Metakey::LastModified` + /// so that we can avoid extra requests. /// /// # Examples /// @@ -134,7 +129,8 @@ impl BlockingOperator { /// # use opendal::BlockingOperator; /// use opendal::ErrorKind; /// # - /// # fn test(op: BlockingOperator) -> Result<()> { + /// # #[tokio::main] + /// # async fn test(op: BlockingOperator) -> Result<()> { /// if let Err(e) = op.stat("test") { /// if e.kind() == ErrorKind::NotFound { /// println!("file not exist") @@ -697,7 +693,7 @@ impl BlockingOperator { return self.delete(path); } - let obs = self.scan(path)?; + let obs = self.lister_with(path).delimiter("").call()?; for v in obs { match v { @@ -715,30 +711,44 @@ impl BlockingOperator { Ok(()) } - /// List current dir path. + /// List entries within a given directory as an iterator. /// /// This function will create a new handle to list entries. /// - /// An error will be returned if path doesn't end with `/`. + /// An error will be returned if given path doesn't end with `/`. + /// + /// # Notes + /// + /// ## Listing recursively + /// + /// This function only read the children of the given directory. To read + /// all entries recursively, use [`BlockingOperator::lister_with`] and `delimiter("")` + /// instead. + /// + /// ## Metadata + /// + /// The only metadata that is guaranteed to be available is the `Mode`. + /// For fetching more metadata, please use [`BlockingOperator::lister_with`] and `metakey`. /// /// # Examples /// /// ```no_run - /// # use opendal::Result; + /// # use anyhow::Result; /// # use futures::io; - /// # use opendal::BlockingOperator; - /// # use opendal::EntryMode; - /// # fn test(op: BlockingOperator) -> Result<()> { - /// let mut ds = op.list("path/to/dir/")?; - /// while let Some(mut entry) = ds.next() { - /// let entry = entry?; - /// let meta = entry.metadata(); - /// match meta.mode() { + /// use futures::TryStreamExt; + /// use opendal::EntryMode; + /// use opendal::Metakey; + /// use opendal::BlockingOperator; + /// # async fn test(op: BlockingOperator) -> Result<()> { + /// let mut ds = op.lister("path/to/dir/")?; + /// for de in ds { + /// let de = de?; + /// match de.metadata().mode() { /// EntryMode::FILE => { /// println!("Handling file") /// } /// EntryMode::DIR => { - /// println!("Handling dir like start a new list via de.path()") + /// println!("Handling dir like start a new list via meta.path()") /// } /// EntryMode::Unknown => continue, /// } @@ -746,52 +756,103 @@ impl BlockingOperator { /// # Ok(()) /// # } /// ``` - pub fn list(&self, path: &str) -> Result<BlockingLister> { - let path = normalize_path(path); - - if !validate_path(&path, EntryMode::DIR) { - return Err(Error::new( - ErrorKind::NotADirectory, - "the path trying to list should end with `/`", - ) - .with_operation("BlockingOperator::list") - .with_context("service", self.info().scheme().into_static()) - .with_context("path", &path)); - } - - let (_, pager) = self.inner().blocking_list(&path, OpList::new())?; - Ok(BlockingLister::new(pager)) + pub fn lister(&self, path: &str) -> Result<BlockingLister> { + self.lister_with(path).call() } - /// List dir in flat way. + /// List entries within a given directory as an iterator with options. /// - /// Also, this function can be used to list a prefix. + /// This function will create a new handle to list entries. /// /// An error will be returned if given path doesn't end with `/`. /// - /// # Notes + /// # Examples /// - /// - `scan` will not return the prefix itself. - /// - `scan` is an alias of `list_with(OpList::new().with_delimiter(""))` + /// ## List current dir /// - /// # Examples + /// ```no_run + /// # use anyhow::Result; + /// # use futures::io; + /// use futures::TryStreamExt; + /// use opendal::EntryMode; + /// use opendal::Metakey; + /// use opendal::BlockingOperator; + /// # fn test(op: BlockingOperator) -> Result<()> { + /// let mut ds = op + /// .lister_with("path/to/dir/") + /// .limit(10) + /// .start_after("start") + /// .call()?; + /// for entry in ds { + /// let entry = entry?; + /// match entry.metadata().mode() { + /// EntryMode::FILE => { + /// println!("Handling file {}", entry.path()) + /// } + /// EntryMode::DIR => { + /// println!("Handling dir {}", entry.path()) + /// } + /// EntryMode::Unknown => continue, + /// } + /// } + /// # Ok(()) + /// # } + /// ``` + /// + /// ## List all files recursively /// /// ```no_run - /// # use opendal::Result; + /// # use anyhow::Result; /// # use futures::io; - /// # use opendal::BlockingOperator; - /// # use opendal::EntryMode; + /// use futures::TryStreamExt; + /// use opendal::EntryMode; + /// use opendal::Metakey; + /// use opendal::BlockingOperator; /// # fn test(op: BlockingOperator) -> Result<()> { - /// let mut ds = op.list("path/to/dir/")?; - /// while let Some(mut entry) = ds.next() { + /// let mut ds = op.lister_with("path/to/dir/").delimiter("").call()?; + /// for entry in ds { + /// let entry = entry?; + /// match entry.metadata().mode() { + /// EntryMode::FILE => { + /// println!("Handling file {}", entry.path()) + /// } + /// EntryMode::DIR => { + /// println!("Handling dir {}", entry.path()) + /// } + /// EntryMode::Unknown => continue, + /// } + /// } + /// # Ok(()) + /// # } + /// ``` + /// + /// ## List files with required metadata + /// + /// ```no_run + /// # use anyhow::Result; + /// # use futures::io; + /// use futures::TryStreamExt; + /// use opendal::EntryMode; + /// use opendal::Metakey; + /// use opendal::BlockingOperator; + /// # fn test(op: BlockingOperator) -> Result<()> { + /// let mut ds = op + /// .lister_with("path/to/dir/") + /// .metakey(Metakey::ContentLength | Metakey::LastModified) + /// .call()?; + /// for entry in ds { /// let entry = entry?; /// let meta = entry.metadata(); /// match meta.mode() { /// EntryMode::FILE => { - /// println!("Handling file") + /// println!( + /// "Handling file {} with size {}", + /// entry.path(), + /// meta.content_length() + /// ) /// } /// EntryMode::DIR => { - /// println!("Handling dir like start a new list via meta.path()") + /// println!("Handling dir {}", entry.path()) /// } /// EntryMode::Unknown => continue, /// } @@ -799,22 +860,27 @@ impl BlockingOperator { /// # Ok(()) /// # } /// ``` - pub fn scan(&self, path: &str) -> Result<BlockingLister> { + pub fn lister_with(&self, path: &str) -> FunctionLister { let path = normalize_path(path); - if !validate_path(&path, EntryMode::DIR) { - return Err(Error::new( - ErrorKind::NotADirectory, - "the path trying to scan should end with `/`", - ) - .with_operation("BlockingOperator::list") - .with_context("service", self.info().scheme().into_static()) - .with_context("path", path)); - } + FunctionLister(OperatorFunction::new( + self.inner().clone(), + path, + OpList::default(), + |inner, path, args| { + if !validate_path(&path, EntryMode::FILE) { + return Err( + Error::new(ErrorKind::IsADirectory, "write path is a directory") + .with_operation("BlockingOperator::write_with") + .with_context("service", inner.info().scheme().into_static()) + .with_context("path", &path), + ); + } - let (_, pager) = self - .inner() - .blocking_list(&path, OpList::new().with_delimiter(""))?; - Ok(BlockingLister::new(pager)) + let (_, pager) = inner.blocking_list(&path, args)?; + + Ok(BlockingLister::new(pager)) + }, + )) } } diff --git a/core/src/types/operator/operator_functions.rs b/core/src/types/operator/operator_functions.rs index 0d97e2fed..5734950e0 100644 --- a/core/src/types/operator/operator_functions.rs +++ b/core/src/types/operator/operator_functions.rs @@ -20,6 +20,7 @@ //! By using functions, users can add more options for operation. use bytes::Bytes; +use flagset::FlagSet; use crate::raw::*; use crate::*; @@ -114,3 +115,40 @@ impl FunctionDelete { self.0.call() } } + +/// Function that generated by [`BlockingOperator::lister_with`]. +/// +/// Users can add more options by public functions provided by this struct. +pub struct FunctionLister(pub(crate) OperatorFunction<OpList, BlockingLister>); + +impl FunctionLister { + /// Change the limit of this list operation. + pub fn limit(mut self, v: usize) -> Self { + self.0 = self.0.map_args(|args| args.with_limit(v)); + self + } + + /// Change the start_after of this list operation. + pub fn start_after(mut self, v: &str) -> Self { + self.0 = self.0.map_args(|args| args.with_start_after(v)); + self + } + + /// Change the delimiter. The default delimiter is "/" + pub fn delimiter(mut self, v: &str) -> Self { + self.0 = self.0.map_args(|args| args.with_delimiter(v)); + self + } + + /// Change the metakey. The default metakey is `Metakey::Mode`. + pub fn metakey(mut self, v: impl Into<FlagSet<Metakey>>) -> Self { + self.0 = self.0.map_args(|args| args.with_metakey(v)); + self + } + + /// Call the function to consume all the input and generate a + /// result. + pub fn call(self) -> Result<BlockingLister> { + self.0.call() + } +} diff --git a/core/tests/behavior/blocking_list.rs b/core/tests/behavior/blocking_list.rs index 745396848..2f6e9e534 100644 --- a/core/tests/behavior/blocking_list.rs +++ b/core/tests/behavior/blocking_list.rs @@ -48,7 +48,7 @@ pub fn test_blocking_list_dir(op: BlockingOperator) -> Result<()> { op.write(&path, content).expect("write must succeed"); - let obs = op.list(&format!("{parent}/"))?; + let obs = op.lister(&format!("{parent}/"))?; let mut found = false; for de in obs { let de = de?; @@ -71,7 +71,7 @@ pub fn test_blocking_list_dir(op: BlockingOperator) -> Result<()> { pub fn test_blocking_list_non_exist_dir(op: BlockingOperator) -> Result<()> { let dir = format!("{}/", uuid::Uuid::new_v4()); - let obs = op.list(&dir)?; + let obs = op.lister(&dir)?; let mut objects = HashMap::new(); for de in obs { let de = de?; @@ -98,7 +98,10 @@ pub fn test_blocking_scan(op: BlockingOperator) -> Result<()> { } } - let w = op.scan(&format!("{parent}/x/"))?; + let w = op + .lister_with(&format!("{parent}/x/")) + .delimiter("") + .call()?; let actual = w .collect::<Vec<_>>() .into_iter()
