This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new f7486c4ce feat: Implement list with metakey for blocking (#2861)
f7486c4ce is described below
commit f7486c4cebd159549eb19873223c5e70e67589ee
Author: Xuanwo <[email protected]>
AuthorDate: Mon Aug 14 16:14:56 2023 +0800
feat: Implement list with metakey for blocking (#2861)
* Save work
Signed-off-by: Xuanwo <[email protected]>
* Implement list for blocking operator
Signed-off-by: Xuanwo <[email protected]>
* Fix docs
Signed-off-by: Xuanwo <[email protected]>
* Fix test
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
bindings/c/src/lib.rs | 2 +-
bindings/haskell/src/lib.rs | 4 +-
bindings/nodejs/src/lib.rs | 14 +-
bindings/python/src/lib.rs | 10 +-
core/src/types/operator/blocking_operator.rs | 347 +++++++++++++++++++++-----
core/src/types/operator/operator_functions.rs | 75 ++++++
core/tests/behavior/blocking_list.rs | 9 +-
7 files changed, 387 insertions(+), 74 deletions(-)
diff --git a/bindings/c/src/lib.rs b/bindings/c/src/lib.rs
index 6e5130265..6535b7965 100644
--- a/bindings/c/src/lib.rs
+++ b/bindings/c/src/lib.rs
@@ -485,7 +485,7 @@ pub unsafe extern "C" fn opendal_operator_blocking_list(
let op = (*ptr).as_ref();
let path = unsafe { std::ffi::CStr::from_ptr(path).to_str().unwrap() };
- match op.list(path) {
+ match op.lister(path) {
Ok(lister) => opendal_result_list {
lister:
Box::into_raw(Box::new(opendal_blocking_lister::new(lister))),
code: opendal_code::OPENDAL_OK,
diff --git a/bindings/haskell/src/lib.rs b/bindings/haskell/src/lib.rs
index b0aaf3d56..3b0233388 100644
--- a/bindings/haskell/src/lib.rs
+++ b/bindings/haskell/src/lib.rs
@@ -501,7 +501,7 @@ pub unsafe extern "C" fn blocking_list(
}
};
- let res = match op.list(path_str) {
+ let res = match op.lister(path_str) {
Ok(lister) => FFIResult::ok(Box::into_raw(Box::new(lister))),
Err(e) => FFIResult::err_with_source("Failed to list", e),
};
@@ -540,7 +540,7 @@ pub unsafe extern "C" fn blocking_scan(
}
};
- let res = match op.scan(path_str) {
+ let res = match op.lister_with(path_str).delimiter("").call() {
Ok(lister) => FFIResult::ok(Box::into_raw(Box::new(lister))),
Err(e) => FFIResult::err_with_source("Failed to scan", e),
};
diff --git a/bindings/nodejs/src/lib.rs b/bindings/nodejs/src/lib.rs
index dbab75648..b615bc7e9 100644
--- a/bindings/nodejs/src/lib.rs
+++ b/bindings/nodejs/src/lib.rs
@@ -336,7 +336,12 @@ impl Operator {
#[napi]
pub fn scan_sync(&self, path: String) -> Result<BlockingLister> {
Ok(BlockingLister(
- self.0.blocking().scan(&path).map_err(format_napi_error)?,
+ self.0
+ .blocking()
+ .lister_with(&path)
+ .delimiter("")
+ .call()
+ .map_err(format_napi_error)?,
))
}
@@ -443,7 +448,12 @@ impl Operator {
#[napi]
pub fn list_sync(&self, path: String) -> Result<BlockingLister> {
Ok(BlockingLister(
- self.0.blocking().scan(&path).map_err(format_napi_error)?,
+ self.0
+ .blocking()
+ .lister_with(&path)
+ .delimiter("")
+ .call()
+ .map_err(format_napi_error)?,
))
}
diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs
index 7d58d2773..81189b3b0 100644
--- a/bindings/python/src/lib.rs
+++ b/bindings/python/src/lib.rs
@@ -146,12 +146,18 @@ impl Operator {
/// List current dir path.
pub fn list(&self, path: &str) -> PyResult<BlockingLister> {
- Ok(BlockingLister(self.0.list(path).map_err(format_pyerr)?))
+ Ok(BlockingLister(self.0.lister(path).map_err(format_pyerr)?))
}
/// List dir in flat way.
pub fn scan(&self, path: &str) -> PyResult<BlockingLister> {
- Ok(BlockingLister(self.0.scan(path).map_err(format_pyerr)?))
+ Ok(BlockingLister(
+ self.0
+ .lister_with(path)
+ .delimiter("")
+ .call()
+ .map_err(format_pyerr)?,
+ ))
}
fn __repr__(&self) -> String {
diff --git a/core/src/types/operator/blocking_operator.rs
b/core/src/types/operator/blocking_operator.rs
index 5f8c6d279..8a9027df5 100644
--- a/core/src/types/operator/blocking_operator.rs
+++ b/core/src/types/operator/blocking_operator.rs
@@ -100,8 +100,7 @@ impl BlockingOperator {
/// # use anyhow::Result;
/// use opendal::BlockingOperator;
///
- /// # #[tokio::main]
- /// # async fn test(op: BlockingOperator) -> Result<()> {
+ /// # fn test(op: BlockingOperator) -> Result<()> {
/// let info = op.info();
/// # Ok(())
/// # }
@@ -113,18 +112,13 @@ impl BlockingOperator {
/// # Operator blocking API.
impl BlockingOperator {
- /// Get current path's metadata **without cache** directly.
+ /// Get current path's metadata.
///
/// # Notes
///
- /// Use `stat` if you:
- ///
- /// - Want detect the outside changes of path.
- /// - Don't want to read from cached metadata.
- ///
- /// You may want to use `metadata` if you are working with entries
- /// returned by [`Lister`]. It's highly possible that metadata
- /// you want has already been cached.
+ /// For fetch metadata of entries returned by [`Lister`], it's better to
use [`list_with`] and
+ /// [`lister_with`] with `metakey` query like `Metakey::ContentLength |
Metakey::LastModified`
+ /// so that we can avoid extra requests.
///
/// # Examples
///
@@ -195,7 +189,7 @@ impl BlockingOperator {
/// # use std::io::Result;
/// # use opendal::BlockingOperator;
/// # use futures::TryStreamExt;
- /// # async fn test(op: BlockingOperator) -> Result<()> {
+ /// # fn test(op: BlockingOperator) -> Result<()> {
/// op.create_dir("path/to/dir/")?;
/// # Ok(())
/// # }
@@ -251,7 +245,7 @@ impl BlockingOperator {
/// # use opendal::BlockingOperator;
/// # use futures::TryStreamExt;
/// # use opendal::Scheme;
- /// # async fn test(op: BlockingOperator) -> Result<()> {
+ /// # fn test(op: BlockingOperator) -> Result<()> {
/// let bs = op.range_read("path/to/file", 1024..2048)?;
/// # Ok(())
/// # }
@@ -485,7 +479,7 @@ impl BlockingOperator {
/// # use opendal::BlockingOperator;
/// use bytes::Bytes;
///
- /// # async fn test(op: BlockingOperator) -> Result<()> {
+ /// # fn test(op: BlockingOperator) -> Result<()> {
/// let bs = b"hello, world!".to_vec();
/// let _ = op
/// .write_with("hello.txt", bs)
@@ -697,7 +691,7 @@ impl BlockingOperator {
return self.delete(path);
}
- let obs = self.scan(path)?;
+ let obs = self.lister_with(path).delimiter("").call()?;
for v in obs {
match v {
@@ -715,30 +709,127 @@ impl BlockingOperator {
Ok(())
}
- /// List current dir path.
+ /// List entries within a given directory.
///
- /// This function will create a new handle to list entries.
+ /// # Notes
+ ///
+ /// ## Listing recursively
+ ///
+ /// This function only read the children of the given directory. To read
+ /// all entries recursively, use
`BlockingOperator::list_with("path").delimiter("")`
+ /// instead.
+ ///
+ /// ## Streaming
///
- /// An error will be returned if path doesn't end with `/`.
+ /// This function will read all entries in the given directory. It could
+ /// take very long time and consume a lot of memory if the directory
+ /// contains a lot of entries.
+ ///
+ /// In order to avoid this, you can use [`BlockingOperator::lister`] to
list entries in
+ /// a streaming way.
+ ///
+ /// ## Metadata
+ ///
+ /// The only metadata that is guaranteed to be available is the `Mode`.
+ /// For fetching more metadata, please use [`BlockingOperator::list_with`]
and `metakey`.
///
/// # Examples
///
/// ```no_run
- /// # use opendal::Result;
- /// # use futures::io;
- /// # use opendal::BlockingOperator;
- /// # use opendal::EntryMode;
+ /// # use anyhow::Result;
+ /// use opendal::EntryMode;
+ /// use opendal::Metakey;
+ /// use opendal::BlockingOperator;
+ /// # fn test(op: BlockingOperator) -> Result<()> {
+ /// let mut entries = op.list("path/to/dir/")?;
+ /// for entry in entries {
+ /// match entry.metadata().mode() {
+ /// EntryMode::FILE => {
+ /// println!("Handling file")
+ /// }
+ /// EntryMode::DIR => {
+ /// println!("Handling dir {}", entry.path())
+ /// }
+ /// EntryMode::Unknown => continue,
+ /// }
+ /// }
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn list(&self, path: &str) -> Result<Vec<Entry>> {
+ self.list_with(path).call()
+ }
+
+ /// List entries within a given directory with options.
+ ///
+ /// # Notes
+ ///
+ /// ## For streaming
+ ///
+ /// This function will read all entries in the given directory. It could
+ /// take very long time and consume a lot of memory if the directory
+ /// contains a lot of entries.
+ ///
+ /// In order to avoid this, you can use [`Operator::lister`] to list
entries in
+ /// a streaming way.
+ ///
+ /// ## Metadata
+ ///
+ /// The only metadata that is guaranteed to be available is the `Mode`.
+ /// For fetching more metadata, please specify the `metakey`.
+ ///
+ /// # Examples
+ ///
+ /// ## List entries with prefix
+ ///
+ /// This function can also be used to list entries in recursive way.
+ ///
+ /// ```no_run
+ /// # use anyhow::Result;
+ /// use opendal::EntryMode;
+ /// use opendal::Metakey;
+ /// use opendal::BlockingOperator;
/// # fn test(op: BlockingOperator) -> Result<()> {
- /// let mut ds = op.list("path/to/dir/")?;
- /// while let Some(mut entry) = ds.next() {
- /// let entry = entry?;
+ /// let mut entries = op.list_with("prefix/").delimiter("").call()?;
+ /// for entry in entries {
+ /// match entry.metadata().mode() {
+ /// EntryMode::FILE => {
+ /// println!("Handling file")
+ /// }
+ /// EntryMode::DIR => {
+ /// println!("Handling dir like start a new list via
meta.path()")
+ /// }
+ /// EntryMode::Unknown => continue,
+ /// }
+ /// }
+ /// # Ok(())
+ /// # }
+ /// ```
+ ///
+ /// ## List entries with metakey for more metadata
+ ///
+ /// ```no_run
+ /// # use anyhow::Result;
+ /// use opendal::EntryMode;
+ /// use opendal::Metakey;
+ /// use opendal::BlockingOperator;
+ /// # fn test(op: BlockingOperator) -> Result<()> {
+ /// let mut entries = op
+ /// .list_with("dir/")
+ /// .metakey(Metakey::ContentLength | Metakey::LastModified)
+ /// .call()?;
+ /// for entry in entries {
/// let meta = entry.metadata();
/// match meta.mode() {
/// EntryMode::FILE => {
- /// println!("Handling file")
+ /// println!(
+ /// "Handling file {} with size {}",
+ /// entry.path(),
+ /// meta.content_length()
+ /// )
/// }
/// EntryMode::DIR => {
- /// println!("Handling dir like start a new list via
de.path()")
+ /// println!("Handling dir {}", entry.path())
/// }
/// EntryMode::Unknown => continue,
/// }
@@ -746,52 +837,174 @@ impl BlockingOperator {
/// # Ok(())
/// # }
/// ```
- pub fn list(&self, path: &str) -> Result<BlockingLister> {
+ pub fn list_with(&self, path: &str) -> FunctionList {
let path = normalize_path(path);
- if !validate_path(&path, EntryMode::DIR) {
- return Err(Error::new(
- ErrorKind::NotADirectory,
- "the path trying to list should end with `/`",
- )
- .with_operation("BlockingOperator::list")
- .with_context("service", self.info().scheme().into_static())
- .with_context("path", &path));
- }
+ FunctionList(OperatorFunction::new(
+ self.inner().clone(),
+ path,
+ OpList::default(),
+ |inner, path, args| {
+ if !validate_path(&path, EntryMode::DIR) {
+ return Err(Error::new(
+ ErrorKind::NotADirectory,
+ "the path trying to list should end with `/`",
+ )
+ .with_operation("BlockingOperator::list")
+ .with_context("service",
inner.info().scheme().into_static())
+ .with_context("path", &path));
+ }
- let (_, pager) = self.inner().blocking_list(&path, OpList::new())?;
- Ok(BlockingLister::new(pager))
+ let (_, pager) = inner.blocking_list(&path, args)?;
+ let lister = BlockingLister::new(pager);
+
+ lister.collect()
+ },
+ ))
}
- /// List dir in flat way.
+ /// List entries within a given directory as an iterator.
///
- /// Also, this function can be used to list a prefix.
+ /// This function will create a new handle to list entries.
///
/// An error will be returned if given path doesn't end with `/`.
///
/// # Notes
///
- /// - `scan` will not return the prefix itself.
- /// - `scan` is an alias of `list_with(OpList::new().with_delimiter(""))`
+ /// ## Listing recursively
+ ///
+ /// This function only read the children of the given directory. To read
+ /// all entries recursively, use [`BlockingOperator::lister_with`] and
`delimiter("")`
+ /// instead.
+ ///
+ /// ## Metadata
+ ///
+ /// The only metadata that is guaranteed to be available is the `Mode`.
+ /// For fetching more metadata, please use
[`BlockingOperator::lister_with`] and `metakey`.
///
/// # Examples
///
/// ```no_run
- /// # use opendal::Result;
+ /// # use anyhow::Result;
/// # use futures::io;
- /// # use opendal::BlockingOperator;
- /// # use opendal::EntryMode;
+ /// use futures::TryStreamExt;
+ /// use opendal::EntryMode;
+ /// use opendal::Metakey;
+ /// use opendal::BlockingOperator;
+ /// # fn test(op: BlockingOperator) -> Result<()> {
+ /// let mut ds = op.lister("path/to/dir/")?;
+ /// for de in ds {
+ /// let de = de?;
+ /// match de.metadata().mode() {
+ /// EntryMode::FILE => {
+ /// println!("Handling file")
+ /// }
+ /// EntryMode::DIR => {
+ /// println!("Handling dir like start a new list via
meta.path()")
+ /// }
+ /// EntryMode::Unknown => continue,
+ /// }
+ /// }
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn lister(&self, path: &str) -> Result<BlockingLister> {
+ self.lister_with(path).call()
+ }
+
+ /// List entries within a given directory as an iterator with options.
+ ///
+ /// This function will create a new handle to list entries.
+ ///
+ /// An error will be returned if given path doesn't end with `/`.
+ ///
+ /// # Examples
+ ///
+ /// ## List current dir
+ ///
+ /// ```no_run
+ /// # use anyhow::Result;
+ /// # use futures::io;
+ /// use futures::TryStreamExt;
+ /// use opendal::EntryMode;
+ /// use opendal::Metakey;
+ /// use opendal::BlockingOperator;
+ /// # fn test(op: BlockingOperator) -> Result<()> {
+ /// let mut ds = op
+ /// .lister_with("path/to/dir/")
+ /// .limit(10)
+ /// .start_after("start")
+ /// .call()?;
+ /// for entry in ds {
+ /// let entry = entry?;
+ /// match entry.metadata().mode() {
+ /// EntryMode::FILE => {
+ /// println!("Handling file {}", entry.path())
+ /// }
+ /// EntryMode::DIR => {
+ /// println!("Handling dir {}", entry.path())
+ /// }
+ /// EntryMode::Unknown => continue,
+ /// }
+ /// }
+ /// # Ok(())
+ /// # }
+ /// ```
+ ///
+ /// ## List all files recursively
+ ///
+ /// ```no_run
+ /// # use anyhow::Result;
+ /// # use futures::io;
+ /// use futures::TryStreamExt;
+ /// use opendal::EntryMode;
+ /// use opendal::Metakey;
+ /// use opendal::BlockingOperator;
+ /// # fn test(op: BlockingOperator) -> Result<()> {
+ /// let mut ds = op.lister_with("path/to/dir/").delimiter("").call()?;
+ /// for entry in ds {
+ /// let entry = entry?;
+ /// match entry.metadata().mode() {
+ /// EntryMode::FILE => {
+ /// println!("Handling file {}", entry.path())
+ /// }
+ /// EntryMode::DIR => {
+ /// println!("Handling dir {}", entry.path())
+ /// }
+ /// EntryMode::Unknown => continue,
+ /// }
+ /// }
+ /// # Ok(())
+ /// # }
+ /// ```
+ ///
+ /// ## List files with required metadata
+ ///
+ /// ```no_run
+ /// # use anyhow::Result;
+ /// # use futures::io;
+ /// use futures::TryStreamExt;
+ /// use opendal::EntryMode;
+ /// use opendal::Metakey;
+ /// use opendal::BlockingOperator;
/// # fn test(op: BlockingOperator) -> Result<()> {
- /// let mut ds = op.list("path/to/dir/")?;
- /// while let Some(mut entry) = ds.next() {
+ /// let mut ds = op
+ /// .lister_with("path/to/dir/")
+ /// .metakey(Metakey::ContentLength | Metakey::LastModified)
+ /// .call()?;
+ /// for entry in ds {
/// let entry = entry?;
/// let meta = entry.metadata();
/// match meta.mode() {
/// EntryMode::FILE => {
- /// println!("Handling file")
+ /// println!(
+ /// "Handling file {} with size {}",
+ /// entry.path(),
+ /// meta.content_length()
+ /// )
/// }
/// EntryMode::DIR => {
- /// println!("Handling dir like start a new list via
meta.path()")
+ /// println!("Handling dir {}", entry.path())
/// }
/// EntryMode::Unknown => continue,
/// }
@@ -799,22 +1012,28 @@ impl BlockingOperator {
/// # Ok(())
/// # }
/// ```
- pub fn scan(&self, path: &str) -> Result<BlockingLister> {
+ pub fn lister_with(&self, path: &str) -> FunctionLister {
let path = normalize_path(path);
- if !validate_path(&path, EntryMode::DIR) {
- return Err(Error::new(
- ErrorKind::NotADirectory,
- "the path trying to scan should end with `/`",
- )
- .with_operation("BlockingOperator::list")
- .with_context("service", self.info().scheme().into_static())
- .with_context("path", path));
- }
+ FunctionLister(OperatorFunction::new(
+ self.inner().clone(),
+ path,
+ OpList::default(),
+ |inner, path, args| {
+ if !validate_path(&path, EntryMode::DIR) {
+ return Err(Error::new(
+ ErrorKind::NotADirectory,
+ "the path trying to list should end with `/`",
+ )
+ .with_operation("BlockingOperator::list")
+ .with_context("service",
inner.info().scheme().into_static())
+ .with_context("path", &path));
+ }
- let (_, pager) = self
- .inner()
- .blocking_list(&path, OpList::new().with_delimiter(""))?;
- Ok(BlockingLister::new(pager))
+ let (_, pager) = inner.blocking_list(&path, args)?;
+
+ Ok(BlockingLister::new(pager))
+ },
+ ))
}
}
diff --git a/core/src/types/operator/operator_functions.rs
b/core/src/types/operator/operator_functions.rs
index 0d97e2fed..2164bcafb 100644
--- a/core/src/types/operator/operator_functions.rs
+++ b/core/src/types/operator/operator_functions.rs
@@ -20,6 +20,7 @@
//! By using functions, users can add more options for operation.
use bytes::Bytes;
+use flagset::FlagSet;
use crate::raw::*;
use crate::*;
@@ -114,3 +115,77 @@ impl FunctionDelete {
self.0.call()
}
}
+
+/// Function that generated by [`BlockingOperator::list_with`].
+///
+/// Users can add more options by public functions provided by this struct.
+pub struct FunctionList(pub(crate) OperatorFunction<OpList, Vec<Entry>>);
+
+impl FunctionList {
+ /// Change the limit of this list operation.
+ pub fn limit(mut self, v: usize) -> Self {
+ self.0 = self.0.map_args(|args| args.with_limit(v));
+ self
+ }
+
+ /// Change the start_after of this list operation.
+ pub fn start_after(mut self, v: &str) -> Self {
+ self.0 = self.0.map_args(|args| args.with_start_after(v));
+ self
+ }
+
+ /// Change the delimiter. The default delimiter is "/"
+ pub fn delimiter(mut self, v: &str) -> Self {
+ self.0 = self.0.map_args(|args| args.with_delimiter(v));
+ self
+ }
+
+ /// Change the metakey. The default metakey is `Metakey::Mode`.
+ pub fn metakey(mut self, v: impl Into<FlagSet<Metakey>>) -> Self {
+ self.0 = self.0.map_args(|args| args.with_metakey(v));
+ self
+ }
+
+ /// Call the function to consume all the input and generate a
+ /// result.
+ pub fn call(self) -> Result<Vec<Entry>> {
+ self.0.call()
+ }
+}
+
+/// Function that generated by [`BlockingOperator::lister_with`].
+///
+/// Users can add more options by public functions provided by this struct.
+pub struct FunctionLister(pub(crate) OperatorFunction<OpList, BlockingLister>);
+
+impl FunctionLister {
+ /// Change the limit of this list operation.
+ pub fn limit(mut self, v: usize) -> Self {
+ self.0 = self.0.map_args(|args| args.with_limit(v));
+ self
+ }
+
+ /// Change the start_after of this list operation.
+ pub fn start_after(mut self, v: &str) -> Self {
+ self.0 = self.0.map_args(|args| args.with_start_after(v));
+ self
+ }
+
+ /// Change the delimiter. The default delimiter is "/"
+ pub fn delimiter(mut self, v: &str) -> Self {
+ self.0 = self.0.map_args(|args| args.with_delimiter(v));
+ self
+ }
+
+ /// Change the metakey. The default metakey is `Metakey::Mode`.
+ pub fn metakey(mut self, v: impl Into<FlagSet<Metakey>>) -> Self {
+ self.0 = self.0.map_args(|args| args.with_metakey(v));
+ self
+ }
+
+ /// Call the function to consume all the input and generate a
+ /// result.
+ pub fn call(self) -> Result<BlockingLister> {
+ self.0.call()
+ }
+}
diff --git a/core/tests/behavior/blocking_list.rs
b/core/tests/behavior/blocking_list.rs
index 745396848..2f6e9e534 100644
--- a/core/tests/behavior/blocking_list.rs
+++ b/core/tests/behavior/blocking_list.rs
@@ -48,7 +48,7 @@ pub fn test_blocking_list_dir(op: BlockingOperator) ->
Result<()> {
op.write(&path, content).expect("write must succeed");
- let obs = op.list(&format!("{parent}/"))?;
+ let obs = op.lister(&format!("{parent}/"))?;
let mut found = false;
for de in obs {
let de = de?;
@@ -71,7 +71,7 @@ pub fn test_blocking_list_dir(op: BlockingOperator) ->
Result<()> {
pub fn test_blocking_list_non_exist_dir(op: BlockingOperator) -> Result<()> {
let dir = format!("{}/", uuid::Uuid::new_v4());
- let obs = op.list(&dir)?;
+ let obs = op.lister(&dir)?;
let mut objects = HashMap::new();
for de in obs {
let de = de?;
@@ -98,7 +98,10 @@ pub fn test_blocking_scan(op: BlockingOperator) ->
Result<()> {
}
}
- let w = op.scan(&format!("{parent}/x/"))?;
+ let w = op
+ .lister_with(&format!("{parent}/x/"))
+ .delimiter("")
+ .call()?;
let actual = w
.collect::<Vec<_>>()
.into_iter()