This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 9c39f170e feat: Add async list with metakey support (#2803)
9c39f170e is described below
commit 9c39f170e50a7f0b64fdf730a8d46ceb2b0a9a1a
Author: Xuanwo <[email protected]>
AuthorDate: Mon Aug 7 19:37:34 2023 +0800
feat: Add async list with metakey support (#2803)
* feat: Add async list with metakey support
Signed-off-by: Xuanwo <[email protected]>
* format code
Signed-off-by: Xuanwo <[email protected]>
* regen opendal.h
Signed-off-by: Xuanwo <[email protected]>
* Fix build
Signed-off-by: Xuanwo <[email protected]>
* Fix build of oay
Signed-off-by: Xuanwo <[email protected]>
* Add metakey test
Signed-off-by: Xuanwo <[email protected]>
* Add docs
Signed-off-by: Xuanwo <[email protected]>
* Fix build for object_store
Signed-off-by: Xuanwo <[email protected]>
* Fix doc test
Signed-off-by: Xuanwo <[email protected]>
* Fix unit test
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
bin/oay/src/services/s3/service.rs | 18 +-
bin/oli/src/commands/cp.rs | 3 +-
bindings/c/include/opendal.h | 43 +++-
bindings/object_store/src/lib.rs | 23 +-
core/src/layers/async_backtrace.rs | 4 +-
core/src/layers/await_tree.rs | 7 +-
core/src/layers/immutable_index.rs | 25 +--
core/src/raw/oio/entry.rs | 2 +-
.../raw/oio/read/into_seekable_read_by_range.rs | 2 +-
core/src/raw/oio/write/multipart_upload_write.rs | 3 +-
core/src/raw/ops.rs | 20 ++
core/src/services/tikv/backend.rs | 15 +-
core/src/types/entry.rs | 174 ++++++++++++---
core/src/types/list.rs | 153 +++++--------
core/src/types/metadata.rs | 52 +++++
core/src/types/operator/blocking_operator.rs | 110 +---------
core/src/types/operator/operator.rs | 244 ++++++++++-----------
core/src/types/operator/operator_futures.rs | 13 ++
core/tests/behavior/fuzz.rs | 3 +-
core/tests/behavior/list.rs | 91 ++++++++
20 files changed, 570 insertions(+), 435 deletions(-)
diff --git a/bin/oay/src/services/s3/service.rs
b/bin/oay/src/services/s3/service.rs
index 3fec8625f..a41130112 100644
--- a/bin/oay/src/services/s3/service.rs
+++ b/bin/oay/src/services/s3/service.rs
@@ -25,6 +25,7 @@ use axum::response::Response;
use axum::routing::get;
use axum::Router;
use chrono::SecondsFormat;
+use futures_util::StreamExt;
use opendal::Metakey;
use opendal::Operator;
use serde::Deserialize;
@@ -103,21 +104,18 @@ async fn handle_list_objects(
.op
.lister_with(¶ms.prefix)
.start_after(¶ms.start_after)
- .await?;
+ .metakey(Metakey::Mode | Metakey::LastModified | Metakey::Etag |
Metakey::ContentLength)
+ .await?
+ .chunks(1000);
- let page = lister.next_page().await?.unwrap_or_default();
+ let page = lister.next().await.unwrap_or_default();
- let is_truncated = lister.has_next().await?;
+ let is_truncated = page.len() >= 1000;
let (mut common_prefixes, mut contents) = (vec![], vec![]);
for v in page {
- let meta = state
- .op
- .metadata(
- &v,
- Metakey::Mode | Metakey::LastModified | Metakey::Etag |
Metakey::ContentLength,
- )
- .await?;
+ let v = v?;
+ let meta = v.metadata();
if meta.is_dir() {
common_prefixes.push(CommonPrefix {
diff --git a/bin/oli/src/commands/cp.rs b/bin/oli/src/commands/cp.rs
index 9bb40bad4..5f17f4fa9 100644
--- a/bin/oli/src/commands/cp.rs
+++ b/bin/oli/src/commands/cp.rs
@@ -25,7 +25,6 @@ use clap::ArgAction;
use clap::ArgMatches;
use clap::Command;
use futures::TryStreamExt;
-use opendal::Metakey;
use crate::config::Config;
@@ -59,7 +58,7 @@ pub async fn main(args: &ArgMatches) -> Result<()> {
let dst_root = Path::new(&dst_path);
let mut ds = src_op.lister_with(&src_path).delimiter("").await?;
while let Some(de) = ds.try_next().await? {
- let meta = src_op.metadata(&de, Metakey::Mode).await?;
+ let meta = de.metadata();
if meta.mode().is_dir() {
continue;
}
diff --git a/bindings/c/include/opendal.h b/bindings/c/include/opendal.h
index d83356324..bc47bf5b5 100644
--- a/bindings/c/include/opendal.h
+++ b/bindings/c/include/opendal.h
@@ -85,7 +85,7 @@ typedef enum opendal_code {
* BlockingLister is designed to list entries at given path in a blocking
* manner.
*
- * Users can construct Lister by `blocking_list` or `blocking_scan`.
+ * Users can construct Lister by `blocking_lister`.
*/
typedef struct BlockingLister BlockingLister;
@@ -122,7 +122,46 @@ typedef struct BlockingLister BlockingLister;
typedef struct BlockingOperator BlockingOperator;
/**
- * Entry is the file/dir entry returned by `Lister`.
+ * Entry returned by [`Lister`] or [`BlockingLister`] to represent a path and
it's relative metadata.
+ *
+ * # Notes
+ *
+ * Entry returned by [`Lister`] or [`BlockingLister`] may carry some already
known metadata.
+ * Lister by default only make sure that `Mode` is fetched. To make sure the
entry contains
+ * metadata you want, please use `list_with` or `lister_with` and `metakey`.
+ *
+ * For example:
+ *
+ * ```no_run
+ * # use anyhow::Result;
+ * use opendal::EntryMode;
+ * use opendal::Metakey;
+ * use opendal::Operator;
+ * # #[tokio::main]
+ * # async fn test(op: Operator) -> Result<()> {
+ * let mut entries = op
+ * .list_with("dir/")
+ * .metakey(Metakey::ContentLength | Metakey::LastModified)
+ * .await?;
+ * for entry in entries {
+ * let meta = entry.metadata();
+ * match meta.mode() {
+ * EntryMode::FILE => {
+ * println!(
+ * "Handling file {} with size {}",
+ * entry.path(),
+ * meta.content_length()
+ * )
+ * }
+ * EntryMode::DIR => {
+ * println!("Handling dir {}", entry.path())
+ * }
+ * EntryMode::Unknown => continue,
+ * }
+ * }
+ * # Ok(())
+ * # }
+ * ```
*/
typedef struct Entry Entry;
diff --git a/bindings/object_store/src/lib.rs b/bindings/object_store/src/lib.rs
index 4e6784639..c05aaa231 100644
--- a/bindings/object_store/src/lib.rs
+++ b/bindings/object_store/src/lib.rs
@@ -149,19 +149,16 @@ impl ObjectStore for OpendalStore {
let stream = self
.inner
.lister_with(&path)
+ .metakey(Metakey::ContentLength | Metakey::LastModified)
.delimiter("")
.await
.map_err(|err| format_object_store_error(err, &path))?;
let stream = stream.then(|res| async {
let entry = res.map_err(|err| format_object_store_error(err, ""))?;
- let meta = self
- .inner
- .metadata(&entry, Metakey::ContentLength |
Metakey::LastModified)
- .await
- .map_err(|err| format_object_store_error(err, entry.path()))?;
+ let meta = entry.metadata();
- Ok(format_object_meta(entry.path(), &meta))
+ Ok(format_object_meta(entry.path(), meta))
});
Ok(stream.boxed())
@@ -171,7 +168,8 @@ impl ObjectStore for OpendalStore {
let path = prefix.map_or("".into(), |x| format!("{}/", x));
let mut stream = self
.inner
- .lister(&path)
+ .lister_with(&path)
+ .metakey(Metakey::Mode | Metakey::ContentLength |
Metakey::LastModified)
.await
.map_err(|err| format_object_store_error(err, &path))?;
@@ -180,19 +178,12 @@ impl ObjectStore for OpendalStore {
while let Some(res) = stream.next().await {
let entry = res.map_err(|err| format_object_store_error(err, ""))?;
- let meta = self
- .inner
- .metadata(
- &entry,
- Metakey::Mode | Metakey::ContentLength |
Metakey::LastModified,
- )
- .await
- .map_err(|err| format_object_store_error(err, entry.path()))?;
+ let meta = entry.metadata();
if meta.is_dir() {
common_prefixes.push(entry.path().into());
} else {
- objects.push(format_object_meta(entry.path(), &meta));
+ objects.push(format_object_meta(entry.path(), meta));
}
}
diff --git a/core/src/layers/async_backtrace.rs
b/core/src/layers/async_backtrace.rs
index 5dccf98be..7ce2d8757 100644
--- a/core/src/layers/async_backtrace.rs
+++ b/core/src/layers/async_backtrace.rs
@@ -15,11 +15,11 @@
// specific language governing permissions and limitations
// under the License.
+use async_trait::async_trait;
+
use crate::raw::*;
use crate::*;
-use async_trait::async_trait;
-
/// Add Efficient, logical 'stack' traces of async functions for the
underlying services.
///
/// # Async Backtrace
diff --git a/core/src/layers/await_tree.rs b/core/src/layers/await_tree.rs
index 583462df6..6f6d32b0b 100644
--- a/core/src/layers/await_tree.rs
+++ b/core/src/layers/await_tree.rs
@@ -15,13 +15,12 @@
// specific language governing permissions and limitations
// under the License.
-use crate::raw::*;
-use crate::*;
-
use async_trait::async_trait;
-
use await_tree::InstrumentAwait;
+use crate::raw::*;
+use crate::*;
+
/// Add a Instrument await-tree for actor-based applications to the underlying
services.
///
/// # AwaitTree
diff --git a/core/src/layers/immutable_index.rs
b/core/src/layers/immutable_index.rs
index cc2cf1907..374482151 100644
--- a/core/src/layers/immutable_index.rs
+++ b/core/src/layers/immutable_index.rs
@@ -310,10 +310,7 @@ mod tests {
"duplicated value: {}",
entry.path()
);
- map.insert(
- entry.path().to_string(),
- op.metadata(&entry, Metakey::Mode).await?.mode(),
- );
+ map.insert(entry.path().to_string(), entry.metadata().mode());
}
assert_eq!(map["file"], EntryMode::FILE);
@@ -351,10 +348,7 @@ mod tests {
"duplicated value: {}",
entry.path()
);
- map.insert(
- entry.path().to_string(),
- op.metadata(&entry, Metakey::Mode).await?.mode(),
- );
+ map.insert(entry.path().to_string(), entry.metadata().mode());
}
debug!("current files: {:?}", map);
@@ -398,10 +392,7 @@ mod tests {
"duplicated value: {}",
entry.path()
);
- map.insert(
- entry.path().to_string(),
- op.metadata(&entry, Metakey::Mode).await?.mode(),
- );
+ map.insert(entry.path().to_string(), entry.metadata().mode());
}
assert_eq!(map.len(), 1);
@@ -417,10 +408,7 @@ mod tests {
"duplicated value: {}",
entry.path()
);
- map.insert(
- entry.path().to_string(),
- op.metadata(&entry, Metakey::Mode).await?.mode(),
- );
+ map.insert(entry.path().to_string(), entry.metadata().mode());
}
assert_eq!(map["dataset/stateful/ontime_2007_200.csv"],
EntryMode::FILE);
@@ -462,10 +450,7 @@ mod tests {
"duplicated value: {}",
entry.path()
);
- map.insert(
- entry.path().to_string(),
- op.metadata(&entry, Metakey::Mode).await?.mode(),
- );
+ map.insert(entry.path().to_string(), entry.metadata().mode());
}
debug!("current files: {:?}", map);
diff --git a/core/src/raw/oio/entry.rs b/core/src/raw/oio/entry.rs
index f476aa887..43c4b9631 100644
--- a/core/src/raw/oio/entry.rs
+++ b/core/src/raw/oio/entry.rs
@@ -79,6 +79,6 @@ impl Entry {
///
/// NOTE: implement this by hand to avoid leaking raw entry to end-users.
pub(crate) fn into_entry(self) -> crate::Entry {
- crate::Entry::new_with(self.path, self.meta)
+ crate::Entry::new(self.path, self.meta)
}
}
diff --git a/core/src/raw/oio/read/into_seekable_read_by_range.rs
b/core/src/raw/oio/read/into_seekable_read_by_range.rs
index a15c99e2a..b996c61e9 100644
--- a/core/src/raw/oio/read/into_seekable_read_by_range.rs
+++ b/core/src/raw/oio/read/into_seekable_read_by_range.rs
@@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-use bytes::Bytes;
use std::future::Future;
use std::io::SeekFrom;
use std::pin::Pin;
@@ -24,6 +23,7 @@ use std::task::ready;
use std::task::Context;
use std::task::Poll;
+use bytes::Bytes;
use futures::future::BoxFuture;
use crate::raw::*;
diff --git a/core/src/raw/oio/write/multipart_upload_write.rs
b/core/src/raw/oio/write/multipart_upload_write.rs
index 0fe035439..8ca10c462 100644
--- a/core/src/raw/oio/write/multipart_upload_write.rs
+++ b/core/src/raw/oio/write/multipart_upload_write.rs
@@ -18,7 +18,8 @@
use async_trait::async_trait;
use bytes::Bytes;
-use crate::{raw::*, *};
+use crate::raw::*;
+use crate::*;
const DEFAULT_WRITE_MIN_SIZE: usize = 8 * 1024 * 1024;
diff --git a/core/src/raw/ops.rs b/core/src/raw/ops.rs
index 308930b5c..715892f0c 100644
--- a/core/src/raw/ops.rs
+++ b/core/src/raw/ops.rs
@@ -21,7 +21,10 @@
use std::time::Duration;
+use flagset::FlagSet;
+
use crate::raw::*;
+use crate::Metakey;
/// Args for `create` operation.
///
@@ -77,6 +80,8 @@ pub struct OpList {
/// The delimiter used to for the list operation. Default to be `/`
delimiter: String,
+
+ metakey: FlagSet<Metakey>,
}
impl Default for OpList {
@@ -85,6 +90,8 @@ impl Default for OpList {
limit: None,
start_after: None,
delimiter: "/".to_string(),
+ // By default, we want to know what's the mode of this entry.
+ metakey: Metakey::Mode.into(),
}
}
}
@@ -127,6 +134,19 @@ impl OpList {
pub fn delimiter(&self) -> &str {
&self.delimiter
}
+
+ /// Change the metakey of this list operation.
+ ///
+ /// The default metakey is `Metakey::Mode`.
+ pub fn with_metakey(mut self, metakey: impl Into<FlagSet<Metakey>>) ->
Self {
+ self.metakey = metakey.into();
+ self
+ }
+
+ /// Get the current metakey.
+ pub fn metakey(&self) -> FlagSet<Metakey> {
+ self.metakey
+ }
}
/// Args for `presign` operation.
diff --git a/core/src/services/tikv/backend.rs
b/core/src/services/tikv/backend.rs
index f70d6bb5a..0a9417181 100644
--- a/core/src/services/tikv/backend.rs
+++ b/core/src/services/tikv/backend.rs
@@ -16,23 +16,22 @@
// under the License.
use std::collections::HashMap;
-use tikv_client::Config;
-use tikv_client::RawClient;
+use std::fmt::Debug;
+use std::fmt::Formatter;
-use crate::raw::adapters::kv;
-use crate::Capability;
-use crate::Scheme;
use async_trait::async_trait;
+use tikv_client::Config;
+use tikv_client::RawClient;
use tokio::sync::OnceCell;
+use crate::raw::adapters::kv;
use crate::Builder;
+use crate::Capability;
use crate::Error;
use crate::ErrorKind;
+use crate::Scheme;
use crate::*;
-use std::fmt::Debug;
-use std::fmt::Formatter;
-
/// TiKV backend builder
#[derive(Clone, Default)]
pub struct TikvBuilder {
diff --git a/core/src/types/entry.rs b/core/src/types/entry.rs
index d0a8be2db..d6a062828 100644
--- a/core/src/types/entry.rs
+++ b/core/src/types/entry.rs
@@ -18,63 +18,179 @@
use crate::raw::*;
use crate::*;
-/// Entry is the file/dir entry returned by `Lister`.
+/// Entry returned by [`Lister`] or [`BlockingLister`] to represent a path and
it's relative metadata.
+///
+/// # Notes
+///
+/// Entry returned by [`Lister`] or [`BlockingLister`] may carry some already
known metadata.
+/// Lister by default only make sure that `Mode` is fetched. To make sure the
entry contains
+/// metadata you want, please use `list_with` or `lister_with` and `metakey`.
+///
+/// For example:
+///
+/// ```no_run
+/// # use anyhow::Result;
+/// use opendal::EntryMode;
+/// use opendal::Metakey;
+/// use opendal::Operator;
+/// # #[tokio::main]
+/// # async fn test(op: Operator) -> Result<()> {
+/// let mut entries = op
+/// .list_with("dir/")
+/// .metakey(Metakey::ContentLength | Metakey::LastModified)
+/// .await?;
+/// for entry in entries {
+/// let meta = entry.metadata();
+/// match meta.mode() {
+/// EntryMode::FILE => {
+/// println!(
+/// "Handling file {} with size {}",
+/// entry.path(),
+/// meta.content_length()
+/// )
+/// }
+/// EntryMode::DIR => {
+/// println!("Handling dir {}", entry.path())
+/// }
+/// EntryMode::Unknown => continue,
+/// }
+/// }
+/// # Ok(())
+/// # }
+/// ```
#[derive(Clone, Debug)]
pub struct Entry {
- /// Path of the entry.
+ /// Path of this entry.
path: String,
- /// Optional cached metadata
- metadata: Option<Metadata>,
+ /// Metadata of this entry.
+ metadata: Metadata,
}
impl Entry {
- /// Create an entry with .
+ /// Create an entry with metadata.
///
/// # Notes
///
- /// This function is crate internal only. Users don't have public
- /// methods to construct an entry with arbitrary cached metadata.
- ///
/// The only way to get an entry with associated cached metadata
- /// is `Operator::list` or `Operator::scan`.
- pub(crate) fn new_with(path: String, metadata: Metadata) -> Self {
- Self {
- path,
- metadata: Some(metadata),
- }
- }
-
- /// Create an [`Entry`] with empty cached metadata.
- pub fn new(path: &str) -> Self {
- Self {
- path: normalize_path(path),
- metadata: None,
- }
+ /// is `Operator::list`.
+ pub(crate) fn new(path: String, metadata: Metadata) -> Self {
+ Self { path, metadata }
}
/// Path of entry. Path is relative to operator's root.
+ ///
/// Only valid in current operator.
+ ///
+ /// If this entry is a dir, `path` MUST end with `/`
+ /// Otherwise, `path` MUST NOT end with `/`.
pub fn path(&self) -> &str {
&self.path
}
/// Name of entry. Name is the last segment of path.
///
- /// If this entry is a dir, `Name` MUST endswith `/`
- /// Otherwise, `Name` MUST NOT endswith `/`.
+ /// If this entry is a dir, `name` MUST end with `/`
+ /// Otherwise, `name` MUST NOT end with `/`.
pub fn name(&self) -> &str {
get_basename(&self.path)
}
- /// Get the cached metadata of entry.
+ /// Fetch metadata of this entry.
///
/// # Notes
///
- /// This function is crate internal only. Because the returning
- /// metadata could be incomplete. Users must use `Operator::metadata`
- /// to query the cached metadata instead.
- pub(crate) fn metadata(&self) -> &Option<Metadata> {
+ /// Metadata only guaranteed to have results of `metakey` (which default
to `Metakey::Mode`).
+ ///
+ /// - `Some(T)` means the metadata is valid.
+ /// - `None` means the metadata is not provided by services.
+ ///
+ /// Visiting a metadata that not covered by `metakey` could result in
panic.
+ ///
+ /// # Examples
+ ///
+ /// Please use `metakey` to specify the metadata you want, for example:
+ ///
+ /// ```no_run
+ /// # use anyhow::Result;
+ /// use opendal::EntryMode;
+ /// use opendal::Metakey;
+ /// use opendal::Operator;
+ /// # #[tokio::main]
+ /// # async fn test(op: Operator) -> Result<()> {
+ /// let mut entries = op
+ /// .list_with("dir/")
+ /// .metakey(Metakey::ContentLength | Metakey::LastModified)
+ /// .await?;
+ /// for entry in entries {
+ /// let meta = entry.metadata();
+ /// match meta.mode() {
+ /// EntryMode::FILE => {
+ /// println!(
+ /// "Handling file {} with size {}",
+ /// entry.path(),
+ /// meta.content_length()
+ /// )
+ /// }
+ /// EntryMode::DIR => {
+ /// println!("Handling dir {}", entry.path())
+ /// }
+ /// EntryMode::Unknown => continue,
+ /// }
+ /// }
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn metadata(&self) -> &Metadata {
&self.metadata
}
+
+ /// Consume this entry to get it's path and metadata.
+ ///
+ /// # Notes
+ ///
+ /// Metadata only guaranteed to have results of `metakey` (which default
to `Metakey::Mode`).
+ ///
+ /// - `Some(T)` means the metadata is valid.
+ /// - `None` means the metadata is not provided by services.
+ ///
+ /// Visiting a metadata that not covered by `metakey` could result in
panic.
+ ///
+ /// # Examples
+ ///
+ /// Please use `metakey` to specify the metadata you want, for example:
+ ///
+ /// ```no_run
+ /// # use anyhow::Result;
+ /// use opendal::EntryMode;
+ /// use opendal::Metakey;
+ /// use opendal::Operator;
+ /// # #[tokio::main]
+ /// # async fn test(op: Operator) -> Result<()> {
+ /// let mut entries = op
+ /// .list_with("dir/")
+ /// .metakey(Metakey::ContentLength | Metakey::LastModified)
+ /// .await?;
+ /// for entry in entries {
+ /// let (path, meta) = entry.into_parts();
+ /// match meta.mode() {
+ /// EntryMode::FILE => {
+ /// println!(
+ /// "Handling file {} with size {}",
+ /// path,
+ /// meta.content_length()
+ /// )
+ /// }
+ /// EntryMode::DIR => {
+ /// println!("Handling dir {}", path)
+ /// }
+ /// EntryMode::Unknown => continue,
+ /// }
+ /// }
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn into_parts(self) -> (String, Metadata) {
+ (self.path, self.metadata)
+ }
}
diff --git a/core/src/types/list.rs b/core/src/types/list.rs
index 275a76db7..514056dba 100644
--- a/core/src/types/list.rs
+++ b/core/src/types/list.rs
@@ -16,12 +16,12 @@
// under the License.
use std::collections::VecDeque;
-use std::mem;
use std::pin::Pin;
use std::task::ready;
use std::task::Context;
use std::task::Poll;
+use flagset::FlagSet;
use futures::future::BoxFuture;
use futures::FutureExt;
use futures::Stream;
@@ -29,22 +29,26 @@ use futures::Stream;
use crate::raw::*;
use crate::*;
+/// Future constructed by listing.
+type ListFuture = BoxFuture<'static, (oio::Pager,
Result<Option<Vec<oio::Entry>>>)>;
+/// Future constructed by stating.
+type StatFuture = BoxFuture<'static, (String, Result<RpStat>)>;
+
/// Lister is designed to list entries at given path in an asynchronous
/// manner.
///
-/// Users can construct Lister by `list` or `scan`.
+/// Users can construct Lister by [`Operator::lister`].
///
-/// User can use lister as `Stream<Item = Result<Entry>>` or
-/// call `next_page` directly.
+/// User can use lister as `Stream<Item = Result<Entry>>`.
pub struct Lister {
- pager: Option<oio::Pager>,
+ acc: FusedAccessor,
+ /// required_metakey is the metakey required by users.
+ required_metakey: FlagSet<Metakey>,
buf: VecDeque<oio::Entry>,
- /// We will move `pager` inside future and return it back while future is
ready.
- /// Thus, we should not allow calling other function while we already have
- /// a future.
- #[allow(clippy::type_complexity)]
- fut: Option<BoxFuture<'static, (oio::Pager,
Result<Option<Vec<oio::Entry>>>)>>,
+ pager: Option<oio::Pager>,
+ listing: Option<ListFuture>,
+ stating: Option<StatFuture>,
}
/// # Safety
@@ -54,75 +58,19 @@ unsafe impl Sync for Lister {}
impl Lister {
/// Create a new lister.
- pub(crate) fn new(pager: oio::Pager) -> Self {
- Self {
- pager: Some(pager),
- buf: VecDeque::default(),
- fut: None,
- }
- }
-
- /// has_next can be used to check if there are more pages.
- pub async fn has_next(&mut self) -> Result<bool> {
- debug_assert!(
- self.fut.is_none(),
- "there are ongoing futures for next page"
- );
+ pub(crate) async fn create(acc: FusedAccessor, path: &str, args: OpList)
-> Result<Self> {
+ let required_metakey = args.metakey();
+ let (_, pager) = acc.list(path, args).await?;
- if !self.buf.is_empty() {
- return Ok(true);
- }
-
- let entries = match self
- .pager
- .as_mut()
- .expect("pager must be valid")
- .next()
- .await?
- {
- // Ideally, the convert from `Vec` to `VecDeque` will not do
reallocation.
- //
- // However, this could be changed as described in [impl<T, A>
From<Vec<T, A>> for VecDeque<T,
A>](https://doc.rust-lang.org/std/collections/struct.VecDeque.html#impl-From%3CVec%3CT%2C%20A%3E%3E-for-VecDeque%3CT%2C%20A%3E)
- Some(entries) => entries.into(),
- None => return Ok(false),
- };
- // Push fetched entries into buffer.
- self.buf = entries;
-
- Ok(true)
- }
+ Ok(Self {
+ acc,
+ required_metakey,
- /// next_page can be used to fetch a new page.
- ///
- /// # Notes
- ///
- /// Don't mix the usage of `next_page` and `Stream<Item = Result<Entry>>`.
- /// Always using the same calling style.
- pub async fn next_page(&mut self) -> Result<Option<Vec<Entry>>> {
- debug_assert!(
- self.fut.is_none(),
- "there are ongoing futures for next page"
- );
-
- let entries = if !self.buf.is_empty() {
- mem::take(&mut self.buf)
- } else {
- match self
- .pager
- .as_mut()
- .expect("pager must be valid")
- .next()
- .await?
- {
- // Ideally, the convert from `Vec` to `VecDeque` will not do
reallocation.
- //
- // However, this could be changed as described in [impl<T, A>
From<Vec<T, A>> for VecDeque<T,
A>](https://doc.rust-lang.org/std/collections/struct.VecDeque.html#impl-From%3CVec%3CT%2C%20A%3E%3E-for-VecDeque%3CT%2C%20A%3E)
- Some(entries) => entries.into(),
- None => return Ok(None),
- }
- };
-
- Ok(Some(entries.into_iter().map(|v| v.into_entry()).collect()))
+ buf: VecDeque::new(),
+ pager: Some(pager),
+ listing: None,
+ stating: None,
+ })
}
}
@@ -130,22 +78,44 @@ impl Stream for Lister {
type Item = Result<Entry>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Option<Self::Item>> {
+ if let Some(fut) = self.stating.as_mut() {
+ let (path, rp) = ready!(fut.poll_unpin(cx));
+ let metadata = rp?.into_metadata();
+
+ self.stating = None;
+ return Poll::Ready(Some(Ok(Entry::new(path, metadata))));
+ }
+
if let Some(oe) = self.buf.pop_front() {
- return Poll::Ready(Some(Ok(oe.into_entry())));
+ let (path, metadata) = oe.into_entry().into_parts();
+ // TODO: we can optimize this by checking the provided metakey
provided by services.
+ if metadata.contains_bit(self.required_metakey) {
+ return Poll::Ready(Some(Ok(Entry::new(path, metadata))));
+ }
+
+ let acc = self.acc.clone();
+ let fut = async move {
+ let path = path;
+ let res = acc.stat(&path, OpStat::default()).await;
+
+ (path, res)
+ };
+ self.stating = Some(Box::pin(fut));
+ return self.poll_next(cx);
}
- if let Some(fut) = self.fut.as_mut() {
+ if let Some(fut) = self.listing.as_mut() {
let (op, res) = ready!(fut.poll_unpin(cx));
self.pager = Some(op);
return match res? {
Some(oes) => {
- self.fut = None;
+ self.listing = None;
self.buf = oes.into();
self.poll_next(cx)
}
None => {
- self.fut = None;
+ self.listing = None;
Poll::Ready(None)
}
};
@@ -157,7 +127,7 @@ impl Stream for Lister {
(pager, res)
};
- self.fut = Some(Box::pin(fut));
+ self.listing = Some(Box::pin(fut));
self.poll_next(cx)
}
}
@@ -165,7 +135,7 @@ impl Stream for Lister {
/// BlockingLister is designed to list entries at given path in a blocking
/// manner.
///
-/// Users can construct Lister by `blocking_list` or `blocking_scan`.
+/// Users can construct Lister by `blocking_lister`.
pub struct BlockingLister {
pager: oio::BlockingPager,
buf: VecDeque<oio::Entry>,
@@ -184,23 +154,6 @@ impl BlockingLister {
buf: VecDeque::default(),
}
}
-
- /// next_page can be used to fetch a new page.
- pub fn next_page(&mut self) -> Result<Option<Vec<Entry>>> {
- let entries = if !self.buf.is_empty() {
- mem::take(&mut self.buf)
- } else {
- match self.pager.next()? {
- // Ideally, the convert from `Vec` to `VecDeque` will not do
reallocation.
- //
- // However, this could be changed as described in [impl<T, A>
From<Vec<T, A>> for VecDeque<T,
A>](https://doc.rust-lang.org/std/collections/struct.VecDeque.html#impl-From%3CVec%3CT%2C%20A%3E%3E-for-VecDeque%3CT%2C%20A%3E)
- Some(entries) => entries.into(),
- None => return Ok(None),
- }
- };
-
- Ok(Some(entries.into_iter().map(|v| v.into_entry()).collect()))
- }
}
/// TODO: we can implement next_chunk.
diff --git a/core/src/types/metadata.rs b/core/src/types/metadata.rs
index 02f5bb90d..66a7b41d4 100644
--- a/core/src/types/metadata.rs
+++ b/core/src/types/metadata.rs
@@ -85,6 +85,18 @@ impl Metadata {
self
}
+ /// Check if there metadata already contains given bit.
+ pub(crate) fn contains_bit(&self, bit: impl Into<FlagSet<Metakey>>) ->
bool {
+ let input_bit = bit.into();
+
+ // If meta already contains complete, we don't need to check.
+ if self.bit.contains(Metakey::Complete) {
+ return true;
+ }
+
+ self.bit.contains(input_bit)
+ }
+
/// mode represent this entry's mode.
pub fn mode(&self) -> EntryMode {
debug_assert!(
@@ -155,6 +167,11 @@ impl Metadata {
///
/// `Content-Length` is defined by [RFC
7230](https://httpwg.org/specs/rfc7230.html#header.content-length)
/// Refer to [MDN
Content-Length](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Length)
for more information.
+ ///
+ /// # Panics
+ ///
+ /// This value is only available when calling on result of `stat` or
`list` with
+ /// [`Metakey::ContentLength`], otherwise it will panic.
pub fn content_length(&self) -> u64 {
debug_assert!(
self.bit.contains(Metakey::ContentLength) ||
self.bit.contains(Metakey::Complete),
@@ -189,6 +206,11 @@ impl Metadata {
/// And removed by [RFC 7231](https://www.rfc-editor.org/rfc/rfc7231).
///
/// OpenDAL will try its best to set this value, but not guarantee this
value is the md5 of content.
+ ///
+ /// # Panics
+ ///
+ /// This value is only available when calling on result of `stat` or
`list` with
+ /// [`Metakey::ContentMd5`], otherwise it will panic.
pub fn content_md5(&self) -> Option<&str> {
debug_assert!(
self.bit.contains(Metakey::ContentMd5) ||
self.bit.contains(Metakey::Complete),
@@ -221,6 +243,11 @@ impl Metadata {
/// Content Type of this entry.
///
/// Content Type is defined by [RFC
9110](https://httpwg.org/specs/rfc9110.html#field.content-type).
+ ///
+ /// # Panics
+ ///
+ /// This value is only available when calling on result of `stat` or
`list` with
+ /// [`Metakey::ContentType`], otherwise it will panic.
pub fn content_type(&self) -> Option<&str> {
debug_assert!(
self.bit.contains(Metakey::ContentType) ||
self.bit.contains(Metakey::Complete),
@@ -251,6 +278,11 @@ impl Metadata {
/// Content Range of this entry.
///
/// Content Range is defined by [RFC
9110](https://httpwg.org/specs/rfc9110.html#field.content-range).
+ ///
+ /// # Panics
+ ///
+ /// This value is only available when calling on result of `stat` or
`list` with
+ /// [`Metakey::ContentRange`], otherwise it will panic.
pub fn content_range(&self) -> Option<BytesContentRange> {
debug_assert!(
self.bit.contains(Metakey::ContentRange) ||
self.bit.contains(Metakey::Complete),
@@ -284,6 +316,11 @@ impl Metadata {
/// Refer to [MDN
Last-Modified](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Last-Modified)
for more information.
///
/// OpenDAL parse the raw value into [`DateTime`] for convenient.
+ ///
+ /// # Panics
+ ///
+ /// This value is only available when calling on result of `stat` or
`list` with
+ /// [`Metakey::LastModified`], otherwise it will panic.
pub fn last_modified(&self) -> Option<DateTime<Utc>> {
debug_assert!(
self.bit.contains(Metakey::LastModified) ||
self.bit.contains(Metakey::Complete),
@@ -324,6 +361,11 @@ impl Metadata {
/// - `W/"0815"`
///
/// `"` is part of etag.
+ ///
+ /// # Panics
+ ///
+ /// This value is only available when calling on result of `stat` or
`list` with
+ /// [`Metakey::Etag`], otherwise it will panic.
pub fn etag(&self) -> Option<&str> {
debug_assert!(
self.bit.contains(Metakey::Etag) ||
self.bit.contains(Metakey::Complete),
@@ -378,6 +420,11 @@ impl Metadata {
/// - "inline"
/// - "attachment"
/// - "attachment; filename=\"filename.jpg\""
+ ///
+ /// # Panics
+ ///
+ /// This value is only available when calling on result of `stat` or
`list` with
+ /// [`Metakey::ContentDisposition`], otherwise it will panic.
pub fn content_disposition(&self) -> Option<&str> {
debug_assert!(
self.bit.contains(Metakey::ContentDisposition) ||
self.bit.contains(Metakey::Complete),
@@ -426,6 +473,11 @@ impl Metadata {
/// Version is a string that can be used to identify the version of this
entry.
///
/// This field may come out from the version control system, like object
versioning in AWS S3.
+ ///
+ /// # Panics
+ ///
+ /// This value is only available when calling on result of `stat` or
`list` with
+ /// [`Metakey::Version`], otherwise it will panic.
pub fn version(&self) -> Option<&str> {
debug_assert!(
self.bit.contains(Metakey::Version) ||
self.bit.contains(Metakey::Complete),
diff --git a/core/src/types/operator/blocking_operator.rs
b/core/src/types/operator/blocking_operator.rs
index 2b479624a..5f8c6d279 100644
--- a/core/src/types/operator/blocking_operator.rs
+++ b/core/src/types/operator/blocking_operator.rs
@@ -19,7 +19,6 @@ use std::io::Read;
use std::ops::RangeBounds;
use bytes::Bytes;
-use flagset::FlagSet;
use super::operator_functions::*;
use crate::raw::*;
@@ -153,99 +152,6 @@ impl BlockingOperator {
Ok(meta)
}
- /// Get current metadata with cache in blocking way.
- ///
- /// `metadata` will check the given query with already cached metadata
- /// first. And query from storage if not found.
- ///
- /// # Notes
- ///
- /// Use `metadata` if you are working with entries returned by
- /// [`Lister`]. It's highly possible that metadata you want
- /// has already been cached.
- ///
- /// You may want to use `stat`, if you:
- ///
- /// - Want detect the outside changes of file.
- /// - Don't want to read from cached file metadata.
- ///
- /// # Behavior
- ///
- /// Visiting not fetched metadata will lead to panic in debug build.
- /// It must be a bug, please fix it instead.
- ///
- /// # Examples
- ///
- /// ## Query already cached metadata
- ///
- /// By query metadata with `None`, we can only query in-memory metadata
- /// cache. In this way, we can make sure that no API call will send.
- ///
- /// ```
- /// # use anyhow::Result;
- /// # use opendal::BlockingOperator;
- /// use opendal::Entry;
- ///
- /// # fn test(op: BlockingOperator, entry: Entry) -> Result<()> {
- /// let meta = op.metadata(&entry, None)?;
- /// // content length COULD be correct.
- /// let _ = meta.content_length();
- /// // etag COULD be correct.
- /// let _ = meta.etag();
- /// # Ok(())
- /// # }
- /// ```
- ///
- /// ## Query content length and content type
- ///
- /// ```
- /// # use anyhow::Result;
- /// # use opendal::BlockingOperator;
- /// use opendal::Entry;
- /// use opendal::Metakey;
- ///
- /// # fn test(op: BlockingOperator, entry: Entry) -> Result<()> {
- /// let meta = op.metadata(&entry, { Metakey::ContentLength |
Metakey::ContentType })?;
- /// // content length MUST be correct.
- /// let _ = meta.content_length();
- /// // etag COULD be correct.
- /// let _ = meta.etag();
- /// # Ok(())
- /// # }
- /// ```
- ///
- /// ## Query all metadata
- ///
- /// By query metadata with `Complete`, we can make sure that we have
fetched all metadata of this entry.
- ///
- /// ```
- /// # use anyhow::Result;
- /// # use opendal::BlockingOperator;
- /// use opendal::Entry;
- /// use opendal::Metakey;
- ///
- /// # fn test(op: BlockingOperator, entry: Entry) -> Result<()> {
- /// let meta = op.metadata(&entry, { Metakey::Complete })?;
- /// // content length MUST be correct.
- /// let _ = meta.content_length();
- /// // etag MUST be correct.
- /// let _ = meta.etag();
- /// # Ok(())
- /// # }
- /// ```
- pub fn metadata(&self, entry: &Entry, flags: impl Into<FlagSet<Metakey>>)
-> Result<Metadata> {
- // Check if cached metadata saticifies the query.
- if let Some(meta) = entry.metadata() {
- if meta.bit().contains(flags) ||
meta.bit().contains(Metakey::Complete) {
- return Ok(meta.clone());
- }
- }
-
- // Else request from backend..
- let meta = self.stat(entry.path())?;
- Ok(meta)
- }
-
/// Check if this path exists or not.
///
/// # Example
@@ -824,11 +730,9 @@ impl BlockingOperator {
/// # use opendal::EntryMode;
/// # fn test(op: BlockingOperator) -> Result<()> {
/// let mut ds = op.list("path/to/dir/")?;
- /// while let Some(mut de) = ds.next() {
- /// let meta = op.metadata(&de?, {
- /// use opendal::Metakey::*;
- /// Mode
- /// })?;
+ /// while let Some(mut entry) = ds.next() {
+ /// let entry = entry?;
+ /// let meta = entry.metadata();
/// match meta.mode() {
/// EntryMode::FILE => {
/// println!("Handling file")
@@ -879,11 +783,9 @@ impl BlockingOperator {
/// # use opendal::EntryMode;
/// # fn test(op: BlockingOperator) -> Result<()> {
/// let mut ds = op.list("path/to/dir/")?;
- /// while let Some(mut de) = ds.next() {
- /// let meta = op.metadata(&de?, {
- /// use opendal::Metakey::*;
- /// Mode
- /// })?;
+ /// while let Some(mut entry) = ds.next() {
+ /// let entry = entry?;
+ /// let meta = entry.metadata();
/// match meta.mode() {
/// EntryMode::FILE => {
/// println!("Handling file")
diff --git a/core/src/types/operator/operator.rs
b/core/src/types/operator/operator.rs
index f1cdb47ac..92959e726 100644
--- a/core/src/types/operator/operator.rs
+++ b/core/src/types/operator/operator.rs
@@ -19,7 +19,6 @@ use std::ops::RangeBounds;
use std::time::Duration;
use bytes::Bytes;
-use flagset::FlagSet;
use futures::stream;
use futures::AsyncReadExt;
use futures::Stream;
@@ -159,18 +158,13 @@ impl Operator {
}
}
- /// Get current path's metadata **without cache** directly.
+ /// Get current path's metadata.
///
/// # Notes
///
- /// Use `stat` if you:
- ///
- /// - Want to detect the outside changes of path.
- /// - Don't want to read from cached metadata.
- ///
- /// You may want to use `metadata` if you are working with entries
- /// returned by [`Lister`]. It's highly possible that metadata
- /// you want has already been cached.
+ /// For fetch metadata of entries returned by [`Lister`], it's better to
use [`list_with`] and
+ /// [`lister_with`] with `metakey` query like `Metakey::ContentLength |
Metakey::LastModified`
+ /// so that we can avoid extra requests.
///
/// # Examples
///
@@ -245,107 +239,6 @@ impl Operator {
fut
}
- /// Get current metadata with cache.
- ///
- /// `metadata` will check the given query with already cached metadata
- /// first. And query from storage if not found.
- ///
- /// # Notes
- ///
- /// Use `metadata` if you are working with entries returned by
- /// [`Lister`]. It's highly possible that metadata you want
- /// has already been cached.
- ///
- /// You may want to use `stat`, if you:
- ///
- /// - Want to detect the outside changes of path.
- /// - Don't want to read from cached metadata.
- ///
- /// # Behavior
- ///
- /// Visiting not fetched metadata will lead to panic in debug build.
- /// It must be a bug, please fix it instead.
- ///
- /// # Examples
- ///
- /// ## Query already cached metadata
- ///
- /// By querying metadata with `None`, we can only query in-memory metadata
- /// cache. In this way, we can make sure that no API call will be sent.
- ///
- /// ```
- /// # use anyhow::Result;
- /// # use opendal::Operator;
- /// use opendal::Entry;
- /// # #[tokio::main]
- /// # async fn test(op: Operator, entry: Entry) -> Result<()> {
- /// let meta = op.metadata(&entry, None).await?;
- /// // content length COULD be correct.
- /// let _ = meta.content_length();
- /// // etag COULD be correct.
- /// let _ = meta.etag();
- /// # Ok(())
- /// # }
- /// ```
- ///
- /// ## Query content length and content type
- ///
- /// ```
- /// # use anyhow::Result;
- /// # use opendal::Operator;
- /// use opendal::Entry;
- /// use opendal::Metakey;
- ///
- /// # #[tokio::main]
- /// # async fn test(op: Operator, entry: Entry) -> Result<()> {
- /// let meta = op
- /// .metadata(&entry, Metakey::ContentLength | Metakey::ContentType)
- /// .await?;
- /// // content length MUST be correct.
- /// let _ = meta.content_length();
- /// // etag COULD be correct.
- /// let _ = meta.etag();
- /// # Ok(())
- /// # }
- /// ```
- ///
- /// ## Query all metadata
- ///
- /// By querying metadata with `Complete`, we can make sure that we have
fetched all metadata of this entry.
- ///
- /// ```
- /// # use anyhow::Result;
- /// # use opendal::Operator;
- /// use opendal::Entry;
- /// use opendal::Metakey;
- ///
- /// # #[tokio::main]
- /// # async fn test(op: Operator, entry: Entry) -> Result<()> {
- /// let meta = op.metadata(&entry, Metakey::Complete).await?;
- /// // content length MUST be correct.
- /// let _ = meta.content_length();
- /// // etag MUST be correct.
- /// let _ = meta.etag();
- /// # Ok(())
- /// # }
- /// ```
- pub async fn metadata(
- &self,
- entry: &Entry,
- flags: impl Into<FlagSet<Metakey>>,
- ) -> Result<Metadata> {
- // Check if cached metadata saticifies the query.
- if let Some(meta) = entry.metadata() {
- if meta.bit().contains(flags) ||
meta.bit().contains(Metakey::Complete) {
- return Ok(meta.clone());
- }
- }
-
- // Else request from backend..
- let meta = self.stat(entry.path()).await?;
- Ok(meta)
- }
-
/// Check if this path exists or not.
///
/// # Example
@@ -1270,13 +1163,13 @@ impl Operator {
///
/// # Notes
///
- /// ## For listing recursively
+ /// ## Listing recursively
///
/// This function only read the children of the given directory. To read
/// all entries recursively, use
`Operator::list_with("path").delimiter("")`
/// instead.
///
- /// ## For streaming
+ /// ## Streaming
///
/// This function will read all entries in the given directory. It could
/// take very long time and consume a lot of memory if the directory
@@ -1285,6 +1178,11 @@ impl Operator {
/// In order to avoid this, you can use [`Operator::lister`] to list
entries in
/// a streaming way.
///
+ /// ## Metadata
+ ///
+ /// The only metadata that is guaranteed to be available is the `Mode`.
+ /// For fetching more metadata, please use [`Operator::list_with`] and
`metakey`.
+ ///
/// # Examples
///
/// ```no_run
@@ -1296,13 +1194,12 @@ impl Operator {
/// # async fn test(op: Operator) -> Result<()> {
/// let mut entries = op.list("path/to/dir/").await?;
/// for entry in entries {
- /// let meta = op.metadata(&entry, Metakey::Mode).await?;
- /// match meta.mode() {
+ /// match entry.metadata().mode() {
/// EntryMode::FILE => {
/// println!("Handling file")
/// }
/// EntryMode::DIR => {
- /// println!("Handling dir like start a new list via
meta.path()")
+ /// println!("Handling dir {}", entry.path())
/// }
/// EntryMode::Unknown => continue,
/// }
@@ -1327,6 +1224,11 @@ impl Operator {
/// In order to avoid this, you can use [`Operator::lister`] to list
entries in
/// a streaming way.
///
+ /// ## Metadata
+ ///
+ /// The only metadata that is guaranteed to be available is the `Mode`.
+ /// For fetching more metadata, please specify the `metakey`.
+ ///
/// # Examples
///
/// ## List entries with prefix
@@ -1342,8 +1244,7 @@ impl Operator {
/// # async fn test(op: Operator) -> Result<()> {
/// let mut entries = op.list_with("prefix/").delimiter("").await?;
/// for entry in entries {
- /// let meta = op.metadata(&entry, Metakey::Mode).await?;
- /// match meta.mode() {
+ /// match entry.metadata().mode() {
/// EntryMode::FILE => {
/// println!("Handling file")
/// }
@@ -1356,6 +1257,39 @@ impl Operator {
/// # Ok(())
/// # }
/// ```
+ ///
+ /// ## List entries with metakey for more metadata
+ ///
+ /// ```no_run
+ /// # use anyhow::Result;
+ /// use opendal::EntryMode;
+ /// use opendal::Metakey;
+ /// use opendal::Operator;
+ /// # #[tokio::main]
+ /// # async fn test(op: Operator) -> Result<()> {
+ /// let mut entries = op
+ /// .list_with("dir/")
+ /// .metakey(Metakey::ContentLength | Metakey::LastModified)
+ /// .await?;
+ /// for entry in entries {
+ /// let meta = entry.metadata();
+ /// match meta.mode() {
+ /// EntryMode::FILE => {
+ /// println!(
+ /// "Handling file {} with size {}",
+ /// entry.path(),
+ /// meta.content_length()
+ /// )
+ /// }
+ /// EntryMode::DIR => {
+ /// println!("Handling dir {}", entry.path())
+ /// }
+ /// EntryMode::Unknown => continue,
+ /// }
+ /// }
+ /// # Ok(())
+ /// # }
+ /// ```
pub fn list_with(&self, path: &str) -> FutureList {
let path = normalize_path(path);
@@ -1375,8 +1309,7 @@ impl Operator {
.with_context("path", &path));
}
- let (_, pager) = inner.list(&path, args).await?;
- let lister = Lister::new(pager);
+ let lister = Lister::create(inner, &path, args).await?;
lister.try_collect().await
};
@@ -1392,6 +1325,19 @@ impl Operator {
///
/// An error will be returned if given path doesn't end with `/`.
///
+ /// # Notes
+ ///
+ /// ## Listing recursively
+ ///
+ /// This function only read the children of the given directory. To read
+ /// all entries recursively, use [`Operator::lister_with`] and
`delimiter("")`
+ /// instead.
+ ///
+ /// ## Metadata
+ ///
+ /// The only metadata that is guaranteed to be available is the `Mode`.
+ /// For fetching more metadata, please use [`Operator::lister_with`] and
`metakey`.
+ ///
/// # Examples
///
/// ```no_run
@@ -1405,8 +1351,7 @@ impl Operator {
/// # async fn test(op: Operator) -> Result<()> {
/// let mut ds = op.lister("path/to/dir/").await?;
/// while let Some(mut de) = ds.try_next().await? {
- /// let meta = op.metadata(&de, Metakey::Mode).await?;
- /// match meta.mode() {
+ /// match de.metadata().mode() {
/// EntryMode::FILE => {
/// println!("Handling file")
/// }
@@ -1447,14 +1392,13 @@ impl Operator {
/// .limit(10)
/// .start_after("start")
/// .await?;
- /// while let Some(mut de) = ds.try_next().await? {
- /// let meta = op.metadata(&de, Metakey::Mode).await?;
- /// match meta.mode() {
+ /// while let Some(mut entry) = ds.try_next().await? {
+ /// match entry.metadata().mode() {
/// EntryMode::FILE => {
- /// println!("Handling file")
+ /// println!("Handling file {}", entry.path())
/// }
/// EntryMode::DIR => {
- /// println!("Handling dir like start a new list via
meta.path()")
+ /// println!("Handling dir {}", entry.path())
/// }
/// EntryMode::Unknown => continue,
/// }
@@ -1475,14 +1419,48 @@ impl Operator {
/// # #[tokio::main]
/// # async fn test(op: Operator) -> Result<()> {
/// let mut ds = op.lister_with("path/to/dir/").delimiter("").await?;
- /// while let Some(mut de) = ds.try_next().await? {
- /// let meta = op.metadata(&de, Metakey::Mode).await?;
+ /// while let Some(mut entry) = ds.try_next().await? {
+ /// match entry.metadata().mode() {
+ /// EntryMode::FILE => {
+ /// println!("Handling file {}", entry.path())
+ /// }
+ /// EntryMode::DIR => {
+ /// println!("Handling dir {}", entry.path())
+ /// }
+ /// EntryMode::Unknown => continue,
+ /// }
+ /// }
+ /// # Ok(())
+ /// # }
+ /// ```
+ ///
+ /// ## List files with required metadata
+ ///
+ /// ```no_run
+ /// # use anyhow::Result;
+ /// # use futures::io;
+ /// use futures::TryStreamExt;
+ /// use opendal::EntryMode;
+ /// use opendal::Metakey;
+ /// use opendal::Operator;
+ /// # #[tokio::main]
+ /// # async fn test(op: Operator) -> Result<()> {
+ /// let mut ds = op
+ /// .lister_with("path/to/dir/")
+ /// .metakey(Metakey::ContentLength | Metakey::LastModified)
+ /// .await?;
+ /// while let Some(mut entry) = ds.try_next().await? {
+ /// let meta = entry.metadata();
/// match meta.mode() {
/// EntryMode::FILE => {
- /// println!("Handling file")
+ /// println!(
+ /// "Handling file {} with size {}",
+ /// entry.path(),
+ /// meta.content_length()
+ /// )
/// }
/// EntryMode::DIR => {
- /// println!("Handling dir like start a new list via
meta.path()")
+ /// println!("Handling dir {}", entry.path())
/// }
/// EntryMode::Unknown => continue,
/// }
@@ -1509,9 +1487,7 @@ impl Operator {
.with_context("path", &path));
}
- let (_, pager) = inner.list(&path, args).await?;
-
- Ok(Lister::new(pager))
+ Lister::create(inner, &path, args).await
};
Box::pin(fut)
},
diff --git a/core/src/types/operator/operator_futures.rs
b/core/src/types/operator/operator_futures.rs
index 68e646788..42ec83311 100644
--- a/core/src/types/operator/operator_futures.rs
+++ b/core/src/types/operator/operator_futures.rs
@@ -27,6 +27,7 @@ use std::task::Poll;
use std::time::Duration;
use bytes::Bytes;
+use flagset::FlagSet;
use futures::future::BoxFuture;
use futures::Future;
use futures::FutureExt;
@@ -582,6 +583,12 @@ impl FutureList {
self.0 = self.0.map_args(|args| args.with_delimiter(v));
self
}
+
+ /// Change the metakey. The default metakey is `Metakey::Mode`.
+ pub fn metakey(mut self, v: impl Into<FlagSet<Metakey>>) -> Self {
+ self.0 = self.0.map_args(|args| args.with_metakey(v));
+ self
+ }
}
impl Future for FutureList {
@@ -615,6 +622,12 @@ impl FutureLister {
self.0 = self.0.map_args(|args| args.with_delimiter(v));
self
}
+
+ /// Change the metakey. The default metakey is `Metakey::Mode`.
+ pub fn metakey(mut self, v: impl Into<FlagSet<Metakey>>) -> Self {
+ self.0 = self.0.map_args(|args| args.with_metakey(v));
+ self
+ }
}
impl Future for FutureLister {
diff --git a/core/tests/behavior/fuzz.rs b/core/tests/behavior/fuzz.rs
index 9e504eaff..cca74763b 100644
--- a/core/tests/behavior/fuzz.rs
+++ b/core/tests/behavior/fuzz.rs
@@ -15,8 +15,9 @@
// specific language governing permissions and limitations
// under the License.
+use std::io;
use std::io::SeekFrom;
-use std::{io, vec};
+use std::vec;
use anyhow::Result;
use futures::AsyncSeekExt;
diff --git a/core/tests/behavior/list.rs b/core/tests/behavior/list.rs
index ed7067600..79a36fe87 100644
--- a/core/tests/behavior/list.rs
+++ b/core/tests/behavior/list.rs
@@ -37,6 +37,8 @@ pub fn behavior_list_tests(op: &Operator) -> Vec<Trial> {
op,
test_check,
test_list_dir,
+ test_list_dir_with_metakey,
+ test_list_dir_with_metakey_complete,
test_list_rich_dir,
test_list_empty_dir,
test_list_non_exist_dir,
@@ -84,6 +86,95 @@ pub async fn test_list_dir(op: Operator) -> Result<()> {
Ok(())
}
+/// List dir with metakey
+pub async fn test_list_dir_with_metakey(op: Operator) -> Result<()> {
+ let parent = uuid::Uuid::new_v4().to_string();
+ let path = format!("{parent}/{}", uuid::Uuid::new_v4());
+ debug!("Generate a random file: {}", &path);
+ let (content, size) = gen_bytes();
+
+ op.write(&path, content).await.expect("write must succeed");
+
+ let mut obs = op
+ .lister_with(&format!("{parent}/"))
+ .metakey(
+ Metakey::Mode
+ | Metakey::CacheControl
+ | Metakey::ContentDisposition
+ | Metakey::ContentLength
+ | Metakey::ContentMd5
+ | Metakey::ContentRange
+ | Metakey::ContentType
+ | Metakey::Etag
+ | Metakey::LastModified
+ | Metakey::Version,
+ )
+ .await?;
+ let mut found = false;
+ while let Some(de) = obs.try_next().await? {
+ let meta = de.metadata();
+ if de.path() == path {
+ assert_eq!(meta.mode(), EntryMode::FILE);
+ assert_eq!(meta.content_length(), size as u64);
+
+ // We don't care about the value, we just to check there is no
panic.
+ let _ = meta.cache_control();
+ let _ = meta.content_disposition();
+ let _ = meta.content_md5();
+ let _ = meta.content_range();
+ let _ = meta.content_type();
+ let _ = meta.etag();
+ let _ = meta.last_modified();
+ let _ = meta.version();
+
+ found = true
+ }
+ }
+ assert!(found, "file should be found in list");
+
+ op.delete(&path).await.expect("delete must succeed");
+ Ok(())
+}
+
+/// List dir with metakey complete
+pub async fn test_list_dir_with_metakey_complete(op: Operator) -> Result<()> {
+ let parent = uuid::Uuid::new_v4().to_string();
+ let path = format!("{parent}/{}", uuid::Uuid::new_v4());
+ debug!("Generate a random file: {}", &path);
+ let (content, size) = gen_bytes();
+
+ op.write(&path, content).await.expect("write must succeed");
+
+ let mut obs = op
+ .lister_with(&format!("{parent}/"))
+ .metakey(Metakey::Complete)
+ .await?;
+ let mut found = false;
+ while let Some(de) = obs.try_next().await? {
+ let meta = de.metadata();
+ if de.path() == path {
+ assert_eq!(meta.mode(), EntryMode::FILE);
+ assert_eq!(meta.content_length(), size as u64);
+
+ // We don't care about the value, we just to check there is no
panic.
+ let _ = meta.cache_control();
+ let _ = meta.content_disposition();
+ let _ = meta.content_md5();
+ let _ = meta.content_range();
+ let _ = meta.content_type();
+ let _ = meta.etag();
+ let _ = meta.last_modified();
+ let _ = meta.version();
+
+ found = true
+ }
+ }
+ assert!(found, "file should be found in list");
+
+ op.delete(&path).await.expect("delete must succeed");
+ Ok(())
+}
+
/// listing a directory, which contains more objects than a single page can
take.
pub async fn test_list_rich_dir(op: Operator) -> Result<()> {
op.create_dir("test_list_rich_dir/").await?;