This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 9c39f170e feat: Add async list with metakey support (#2803)
9c39f170e is described below

commit 9c39f170e50a7f0b64fdf730a8d46ceb2b0a9a1a
Author: Xuanwo <[email protected]>
AuthorDate: Mon Aug 7 19:37:34 2023 +0800

    feat: Add async list with metakey support (#2803)
    
    * feat: Add async list with metakey support
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * format code
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * regen opendal.h
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix build
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix build of oay
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Add metakey test
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Add docs
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix build for object_store
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix doc test
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix unit test
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
---
 bin/oay/src/services/s3/service.rs                 |  18 +-
 bin/oli/src/commands/cp.rs                         |   3 +-
 bindings/c/include/opendal.h                       |  43 +++-
 bindings/object_store/src/lib.rs                   |  23 +-
 core/src/layers/async_backtrace.rs                 |   4 +-
 core/src/layers/await_tree.rs                      |   7 +-
 core/src/layers/immutable_index.rs                 |  25 +--
 core/src/raw/oio/entry.rs                          |   2 +-
 .../raw/oio/read/into_seekable_read_by_range.rs    |   2 +-
 core/src/raw/oio/write/multipart_upload_write.rs   |   3 +-
 core/src/raw/ops.rs                                |  20 ++
 core/src/services/tikv/backend.rs                  |  15 +-
 core/src/types/entry.rs                            | 174 ++++++++++++---
 core/src/types/list.rs                             | 153 +++++--------
 core/src/types/metadata.rs                         |  52 +++++
 core/src/types/operator/blocking_operator.rs       | 110 +---------
 core/src/types/operator/operator.rs                | 244 ++++++++++-----------
 core/src/types/operator/operator_futures.rs        |  13 ++
 core/tests/behavior/fuzz.rs                        |   3 +-
 core/tests/behavior/list.rs                        |  91 ++++++++
 20 files changed, 570 insertions(+), 435 deletions(-)

diff --git a/bin/oay/src/services/s3/service.rs 
b/bin/oay/src/services/s3/service.rs
index 3fec8625f..a41130112 100644
--- a/bin/oay/src/services/s3/service.rs
+++ b/bin/oay/src/services/s3/service.rs
@@ -25,6 +25,7 @@ use axum::response::Response;
 use axum::routing::get;
 use axum::Router;
 use chrono::SecondsFormat;
+use futures_util::StreamExt;
 use opendal::Metakey;
 use opendal::Operator;
 use serde::Deserialize;
@@ -103,21 +104,18 @@ async fn handle_list_objects(
         .op
         .lister_with(&params.prefix)
         .start_after(&params.start_after)
-        .await?;
+        .metakey(Metakey::Mode | Metakey::LastModified | Metakey::Etag | 
Metakey::ContentLength)
+        .await?
+        .chunks(1000);
 
-    let page = lister.next_page().await?.unwrap_or_default();
+    let page = lister.next().await.unwrap_or_default();
 
-    let is_truncated = lister.has_next().await?;
+    let is_truncated = page.len() >= 1000;
 
     let (mut common_prefixes, mut contents) = (vec![], vec![]);
     for v in page {
-        let meta = state
-            .op
-            .metadata(
-                &v,
-                Metakey::Mode | Metakey::LastModified | Metakey::Etag | 
Metakey::ContentLength,
-            )
-            .await?;
+        let v = v?;
+        let meta = v.metadata();
 
         if meta.is_dir() {
             common_prefixes.push(CommonPrefix {
diff --git a/bin/oli/src/commands/cp.rs b/bin/oli/src/commands/cp.rs
index 9bb40bad4..5f17f4fa9 100644
--- a/bin/oli/src/commands/cp.rs
+++ b/bin/oli/src/commands/cp.rs
@@ -25,7 +25,6 @@ use clap::ArgAction;
 use clap::ArgMatches;
 use clap::Command;
 use futures::TryStreamExt;
-use opendal::Metakey;
 
 use crate::config::Config;
 
@@ -59,7 +58,7 @@ pub async fn main(args: &ArgMatches) -> Result<()> {
     let dst_root = Path::new(&dst_path);
     let mut ds = src_op.lister_with(&src_path).delimiter("").await?;
     while let Some(de) = ds.try_next().await? {
-        let meta = src_op.metadata(&de, Metakey::Mode).await?;
+        let meta = de.metadata();
         if meta.mode().is_dir() {
             continue;
         }
diff --git a/bindings/c/include/opendal.h b/bindings/c/include/opendal.h
index d83356324..bc47bf5b5 100644
--- a/bindings/c/include/opendal.h
+++ b/bindings/c/include/opendal.h
@@ -85,7 +85,7 @@ typedef enum opendal_code {
  * BlockingLister is designed to list entries at given path in a blocking
  * manner.
  *
- * Users can construct Lister by `blocking_list` or `blocking_scan`.
+ * Users can construct Lister by `blocking_lister`.
  */
 typedef struct BlockingLister BlockingLister;
 
@@ -122,7 +122,46 @@ typedef struct BlockingLister BlockingLister;
 typedef struct BlockingOperator BlockingOperator;
 
 /**
- * Entry is the file/dir entry returned by `Lister`.
+ * Entry returned by [`Lister`] or [`BlockingLister`] to represent a path and 
it's relative metadata.
+ *
+ * # Notes
+ *
+ * Entry returned by [`Lister`] or [`BlockingLister`] may carry some already 
known metadata.
+ * Lister by default only make sure that `Mode` is fetched. To make sure the 
entry contains
+ * metadata you want, please use `list_with` or `lister_with` and `metakey`.
+ *
+ * For example:
+ *
+ * ```no_run
+ * # use anyhow::Result;
+ * use opendal::EntryMode;
+ * use opendal::Metakey;
+ * use opendal::Operator;
+ * # #[tokio::main]
+ * # async fn test(op: Operator) -> Result<()> {
+ * let mut entries = op
+ *     .list_with("dir/")
+ *     .metakey(Metakey::ContentLength | Metakey::LastModified)
+ *     .await?;
+ * for entry in entries {
+ *     let meta = entry.metadata();
+ *     match meta.mode() {
+ *         EntryMode::FILE => {
+ *             println!(
+ *                 "Handling file {} with size {}",
+ *                 entry.path(),
+ *                 meta.content_length()
+ *             )
+ *         }
+ *         EntryMode::DIR => {
+ *             println!("Handling dir {}", entry.path())
+ *         }
+ *         EntryMode::Unknown => continue,
+ *     }
+ * }
+ * # Ok(())
+ * # }
+ * ```
  */
 typedef struct Entry Entry;
 
diff --git a/bindings/object_store/src/lib.rs b/bindings/object_store/src/lib.rs
index 4e6784639..c05aaa231 100644
--- a/bindings/object_store/src/lib.rs
+++ b/bindings/object_store/src/lib.rs
@@ -149,19 +149,16 @@ impl ObjectStore for OpendalStore {
         let stream = self
             .inner
             .lister_with(&path)
+            .metakey(Metakey::ContentLength | Metakey::LastModified)
             .delimiter("")
             .await
             .map_err(|err| format_object_store_error(err, &path))?;
 
         let stream = stream.then(|res| async {
             let entry = res.map_err(|err| format_object_store_error(err, ""))?;
-            let meta = self
-                .inner
-                .metadata(&entry, Metakey::ContentLength | 
Metakey::LastModified)
-                .await
-                .map_err(|err| format_object_store_error(err, entry.path()))?;
+            let meta = entry.metadata();
 
-            Ok(format_object_meta(entry.path(), &meta))
+            Ok(format_object_meta(entry.path(), meta))
         });
 
         Ok(stream.boxed())
@@ -171,7 +168,8 @@ impl ObjectStore for OpendalStore {
         let path = prefix.map_or("".into(), |x| format!("{}/", x));
         let mut stream = self
             .inner
-            .lister(&path)
+            .lister_with(&path)
+            .metakey(Metakey::Mode | Metakey::ContentLength | 
Metakey::LastModified)
             .await
             .map_err(|err| format_object_store_error(err, &path))?;
 
@@ -180,19 +178,12 @@ impl ObjectStore for OpendalStore {
 
         while let Some(res) = stream.next().await {
             let entry = res.map_err(|err| format_object_store_error(err, ""))?;
-            let meta = self
-                .inner
-                .metadata(
-                    &entry,
-                    Metakey::Mode | Metakey::ContentLength | 
Metakey::LastModified,
-                )
-                .await
-                .map_err(|err| format_object_store_error(err, entry.path()))?;
+            let meta = entry.metadata();
 
             if meta.is_dir() {
                 common_prefixes.push(entry.path().into());
             } else {
-                objects.push(format_object_meta(entry.path(), &meta));
+                objects.push(format_object_meta(entry.path(), meta));
             }
         }
 
diff --git a/core/src/layers/async_backtrace.rs 
b/core/src/layers/async_backtrace.rs
index 5dccf98be..7ce2d8757 100644
--- a/core/src/layers/async_backtrace.rs
+++ b/core/src/layers/async_backtrace.rs
@@ -15,11 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use async_trait::async_trait;
+
 use crate::raw::*;
 use crate::*;
 
-use async_trait::async_trait;
-
 /// Add Efficient, logical 'stack' traces of async functions for the 
underlying services.
 ///
 /// # Async Backtrace
diff --git a/core/src/layers/await_tree.rs b/core/src/layers/await_tree.rs
index 583462df6..6f6d32b0b 100644
--- a/core/src/layers/await_tree.rs
+++ b/core/src/layers/await_tree.rs
@@ -15,13 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::raw::*;
-use crate::*;
-
 use async_trait::async_trait;
-
 use await_tree::InstrumentAwait;
 
+use crate::raw::*;
+use crate::*;
+
 /// Add a Instrument await-tree for actor-based applications to the underlying 
services.
 ///
 /// # AwaitTree
diff --git a/core/src/layers/immutable_index.rs 
b/core/src/layers/immutable_index.rs
index cc2cf1907..374482151 100644
--- a/core/src/layers/immutable_index.rs
+++ b/core/src/layers/immutable_index.rs
@@ -310,10 +310,7 @@ mod tests {
                 "duplicated value: {}",
                 entry.path()
             );
-            map.insert(
-                entry.path().to_string(),
-                op.metadata(&entry, Metakey::Mode).await?.mode(),
-            );
+            map.insert(entry.path().to_string(), entry.metadata().mode());
         }
 
         assert_eq!(map["file"], EntryMode::FILE);
@@ -351,10 +348,7 @@ mod tests {
                 "duplicated value: {}",
                 entry.path()
             );
-            map.insert(
-                entry.path().to_string(),
-                op.metadata(&entry, Metakey::Mode).await?.mode(),
-            );
+            map.insert(entry.path().to_string(), entry.metadata().mode());
         }
 
         debug!("current files: {:?}", map);
@@ -398,10 +392,7 @@ mod tests {
                 "duplicated value: {}",
                 entry.path()
             );
-            map.insert(
-                entry.path().to_string(),
-                op.metadata(&entry, Metakey::Mode).await?.mode(),
-            );
+            map.insert(entry.path().to_string(), entry.metadata().mode());
         }
 
         assert_eq!(map.len(), 1);
@@ -417,10 +408,7 @@ mod tests {
                 "duplicated value: {}",
                 entry.path()
             );
-            map.insert(
-                entry.path().to_string(),
-                op.metadata(&entry, Metakey::Mode).await?.mode(),
-            );
+            map.insert(entry.path().to_string(), entry.metadata().mode());
         }
 
         assert_eq!(map["dataset/stateful/ontime_2007_200.csv"], 
EntryMode::FILE);
@@ -462,10 +450,7 @@ mod tests {
                 "duplicated value: {}",
                 entry.path()
             );
-            map.insert(
-                entry.path().to_string(),
-                op.metadata(&entry, Metakey::Mode).await?.mode(),
-            );
+            map.insert(entry.path().to_string(), entry.metadata().mode());
         }
 
         debug!("current files: {:?}", map);
diff --git a/core/src/raw/oio/entry.rs b/core/src/raw/oio/entry.rs
index f476aa887..43c4b9631 100644
--- a/core/src/raw/oio/entry.rs
+++ b/core/src/raw/oio/entry.rs
@@ -79,6 +79,6 @@ impl Entry {
     ///
     /// NOTE: implement this by hand to avoid leaking raw entry to end-users.
     pub(crate) fn into_entry(self) -> crate::Entry {
-        crate::Entry::new_with(self.path, self.meta)
+        crate::Entry::new(self.path, self.meta)
     }
 }
diff --git a/core/src/raw/oio/read/into_seekable_read_by_range.rs 
b/core/src/raw/oio/read/into_seekable_read_by_range.rs
index a15c99e2a..b996c61e9 100644
--- a/core/src/raw/oio/read/into_seekable_read_by_range.rs
+++ b/core/src/raw/oio/read/into_seekable_read_by_range.rs
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use bytes::Bytes;
 use std::future::Future;
 use std::io::SeekFrom;
 use std::pin::Pin;
@@ -24,6 +23,7 @@ use std::task::ready;
 use std::task::Context;
 use std::task::Poll;
 
+use bytes::Bytes;
 use futures::future::BoxFuture;
 
 use crate::raw::*;
diff --git a/core/src/raw/oio/write/multipart_upload_write.rs 
b/core/src/raw/oio/write/multipart_upload_write.rs
index 0fe035439..8ca10c462 100644
--- a/core/src/raw/oio/write/multipart_upload_write.rs
+++ b/core/src/raw/oio/write/multipart_upload_write.rs
@@ -18,7 +18,8 @@
 use async_trait::async_trait;
 use bytes::Bytes;
 
-use crate::{raw::*, *};
+use crate::raw::*;
+use crate::*;
 
 const DEFAULT_WRITE_MIN_SIZE: usize = 8 * 1024 * 1024;
 
diff --git a/core/src/raw/ops.rs b/core/src/raw/ops.rs
index 308930b5c..715892f0c 100644
--- a/core/src/raw/ops.rs
+++ b/core/src/raw/ops.rs
@@ -21,7 +21,10 @@
 
 use std::time::Duration;
 
+use flagset::FlagSet;
+
 use crate::raw::*;
+use crate::Metakey;
 
 /// Args for `create` operation.
 ///
@@ -77,6 +80,8 @@ pub struct OpList {
 
     /// The delimiter used to for the list operation. Default to be `/`
     delimiter: String,
+
+    metakey: FlagSet<Metakey>,
 }
 
 impl Default for OpList {
@@ -85,6 +90,8 @@ impl Default for OpList {
             limit: None,
             start_after: None,
             delimiter: "/".to_string(),
+            // By default, we want to know what's the mode of this entry.
+            metakey: Metakey::Mode.into(),
         }
     }
 }
@@ -127,6 +134,19 @@ impl OpList {
     pub fn delimiter(&self) -> &str {
         &self.delimiter
     }
+
+    /// Change the metakey of this list operation.
+    ///
+    /// The default metakey is `Metakey::Mode`.
+    pub fn with_metakey(mut self, metakey: impl Into<FlagSet<Metakey>>) -> 
Self {
+        self.metakey = metakey.into();
+        self
+    }
+
+    /// Get the current metakey.
+    pub fn metakey(&self) -> FlagSet<Metakey> {
+        self.metakey
+    }
 }
 
 /// Args for `presign` operation.
diff --git a/core/src/services/tikv/backend.rs 
b/core/src/services/tikv/backend.rs
index f70d6bb5a..0a9417181 100644
--- a/core/src/services/tikv/backend.rs
+++ b/core/src/services/tikv/backend.rs
@@ -16,23 +16,22 @@
 // under the License.
 
 use std::collections::HashMap;
-use tikv_client::Config;
-use tikv_client::RawClient;
+use std::fmt::Debug;
+use std::fmt::Formatter;
 
-use crate::raw::adapters::kv;
-use crate::Capability;
-use crate::Scheme;
 use async_trait::async_trait;
+use tikv_client::Config;
+use tikv_client::RawClient;
 use tokio::sync::OnceCell;
 
+use crate::raw::adapters::kv;
 use crate::Builder;
+use crate::Capability;
 use crate::Error;
 use crate::ErrorKind;
+use crate::Scheme;
 use crate::*;
 
-use std::fmt::Debug;
-use std::fmt::Formatter;
-
 /// TiKV backend builder
 #[derive(Clone, Default)]
 pub struct TikvBuilder {
diff --git a/core/src/types/entry.rs b/core/src/types/entry.rs
index d0a8be2db..d6a062828 100644
--- a/core/src/types/entry.rs
+++ b/core/src/types/entry.rs
@@ -18,63 +18,179 @@
 use crate::raw::*;
 use crate::*;
 
-/// Entry is the file/dir entry returned by `Lister`.
+/// Entry returned by [`Lister`] or [`BlockingLister`] to represent a path and 
it's relative metadata.
+///
+/// # Notes
+///
+/// Entry returned by [`Lister`] or [`BlockingLister`] may carry some already 
known metadata.
+/// Lister by default only make sure that `Mode` is fetched. To make sure the 
entry contains
+/// metadata you want, please use `list_with` or `lister_with` and `metakey`.
+///
+/// For example:
+///
+/// ```no_run
+/// # use anyhow::Result;
+/// use opendal::EntryMode;
+/// use opendal::Metakey;
+/// use opendal::Operator;
+/// # #[tokio::main]
+/// # async fn test(op: Operator) -> Result<()> {
+/// let mut entries = op
+///     .list_with("dir/")
+///     .metakey(Metakey::ContentLength | Metakey::LastModified)
+///     .await?;
+/// for entry in entries {
+///     let meta = entry.metadata();
+///     match meta.mode() {
+///         EntryMode::FILE => {
+///             println!(
+///                 "Handling file {} with size {}",
+///                 entry.path(),
+///                 meta.content_length()
+///             )
+///         }
+///         EntryMode::DIR => {
+///             println!("Handling dir {}", entry.path())
+///         }
+///         EntryMode::Unknown => continue,
+///     }
+/// }
+/// # Ok(())
+/// # }
+/// ```
 #[derive(Clone, Debug)]
 pub struct Entry {
-    /// Path of the entry.
+    /// Path of this entry.
     path: String,
 
-    /// Optional cached metadata
-    metadata: Option<Metadata>,
+    /// Metadata of this entry.
+    metadata: Metadata,
 }
 
 impl Entry {
-    /// Create an entry with .
+    /// Create an entry with metadata.
     ///
     /// # Notes
     ///
-    /// This function is crate internal only. Users don't have public
-    /// methods to construct an entry with arbitrary cached metadata.
-    ///
     /// The only way to get an entry with associated cached metadata
-    /// is `Operator::list` or `Operator::scan`.
-    pub(crate) fn new_with(path: String, metadata: Metadata) -> Self {
-        Self {
-            path,
-            metadata: Some(metadata),
-        }
-    }
-
-    /// Create an [`Entry`] with empty cached metadata.
-    pub fn new(path: &str) -> Self {
-        Self {
-            path: normalize_path(path),
-            metadata: None,
-        }
+    /// is `Operator::list`.
+    pub(crate) fn new(path: String, metadata: Metadata) -> Self {
+        Self { path, metadata }
     }
 
     /// Path of entry. Path is relative to operator's root.
+    ///
     /// Only valid in current operator.
+    ///
+    /// If this entry is a dir, `path` MUST end with `/`
+    /// Otherwise, `path` MUST NOT end with `/`.
     pub fn path(&self) -> &str {
         &self.path
     }
 
     /// Name of entry. Name is the last segment of path.
     ///
-    /// If this entry is a dir, `Name` MUST endswith `/`
-    /// Otherwise, `Name` MUST NOT endswith `/`.
+    /// If this entry is a dir, `name` MUST end with `/`
+    /// Otherwise, `name` MUST NOT end with `/`.
     pub fn name(&self) -> &str {
         get_basename(&self.path)
     }
 
-    /// Get the cached metadata of entry.
+    /// Fetch metadata of this entry.
     ///
     /// # Notes
     ///
-    /// This function is crate internal only. Because the returning
-    /// metadata could be incomplete. Users must use `Operator::metadata`
-    /// to query the cached metadata instead.
-    pub(crate) fn metadata(&self) -> &Option<Metadata> {
+    /// Metadata only guaranteed to have results of `metakey` (which default 
to `Metakey::Mode`).
+    ///
+    /// - `Some(T)` means the metadata is valid.
+    /// - `None` means the metadata is not provided by services.
+    ///
+    /// Visiting a metadata that not covered by `metakey` could result in 
panic.
+    ///
+    /// # Examples
+    ///
+    /// Please use `metakey` to specify the metadata you want, for example:
+    ///
+    /// ```no_run
+    /// # use anyhow::Result;
+    /// use opendal::EntryMode;
+    /// use opendal::Metakey;
+    /// use opendal::Operator;
+    /// # #[tokio::main]
+    /// # async fn test(op: Operator) -> Result<()> {
+    /// let mut entries = op
+    ///     .list_with("dir/")
+    ///     .metakey(Metakey::ContentLength | Metakey::LastModified)
+    ///     .await?;
+    /// for entry in entries {
+    ///     let meta = entry.metadata();
+    ///     match meta.mode() {
+    ///         EntryMode::FILE => {
+    ///             println!(
+    ///                 "Handling file {} with size {}",
+    ///                 entry.path(),
+    ///                 meta.content_length()
+    ///             )
+    ///         }
+    ///         EntryMode::DIR => {
+    ///             println!("Handling dir {}", entry.path())
+    ///         }
+    ///         EntryMode::Unknown => continue,
+    ///     }
+    /// }
+    /// # Ok(())
+    /// # }
+    /// ```
+    pub fn metadata(&self) -> &Metadata {
         &self.metadata
     }
+
+    /// Consume this entry to get it's path and metadata.
+    ///
+    /// # Notes
+    ///
+    /// Metadata only guaranteed to have results of `metakey` (which default 
to `Metakey::Mode`).
+    ///
+    /// - `Some(T)` means the metadata is valid.
+    /// - `None` means the metadata is not provided by services.
+    ///
+    /// Visiting a metadata that not covered by `metakey` could result in 
panic.
+    ///
+    /// # Examples
+    ///
+    /// Please use `metakey` to specify the metadata you want, for example:
+    ///
+    /// ```no_run
+    /// # use anyhow::Result;
+    /// use opendal::EntryMode;
+    /// use opendal::Metakey;
+    /// use opendal::Operator;
+    /// # #[tokio::main]
+    /// # async fn test(op: Operator) -> Result<()> {
+    /// let mut entries = op
+    ///     .list_with("dir/")
+    ///     .metakey(Metakey::ContentLength | Metakey::LastModified)
+    ///     .await?;
+    /// for entry in entries {
+    ///     let (path, meta) = entry.into_parts();
+    ///     match meta.mode() {
+    ///         EntryMode::FILE => {
+    ///             println!(
+    ///                 "Handling file {} with size {}",
+    ///                 path,
+    ///                 meta.content_length()
+    ///             )
+    ///         }
+    ///         EntryMode::DIR => {
+    ///             println!("Handling dir {}", path)
+    ///         }
+    ///         EntryMode::Unknown => continue,
+    ///     }
+    /// }
+    /// # Ok(())
+    /// # }
+    /// ```
+    pub fn into_parts(self) -> (String, Metadata) {
+        (self.path, self.metadata)
+    }
 }
diff --git a/core/src/types/list.rs b/core/src/types/list.rs
index 275a76db7..514056dba 100644
--- a/core/src/types/list.rs
+++ b/core/src/types/list.rs
@@ -16,12 +16,12 @@
 // under the License.
 
 use std::collections::VecDeque;
-use std::mem;
 use std::pin::Pin;
 use std::task::ready;
 use std::task::Context;
 use std::task::Poll;
 
+use flagset::FlagSet;
 use futures::future::BoxFuture;
 use futures::FutureExt;
 use futures::Stream;
@@ -29,22 +29,26 @@ use futures::Stream;
 use crate::raw::*;
 use crate::*;
 
+/// Future constructed by listing.
+type ListFuture = BoxFuture<'static, (oio::Pager, 
Result<Option<Vec<oio::Entry>>>)>;
+/// Future constructed by stating.
+type StatFuture = BoxFuture<'static, (String, Result<RpStat>)>;
+
 /// Lister is designed to list entries at given path in an asynchronous
 /// manner.
 ///
-/// Users can construct Lister by `list` or `scan`.
+/// Users can construct Lister by [`Operator::lister`].
 ///
-/// User can use lister as `Stream<Item = Result<Entry>>` or
-/// call `next_page` directly.
+/// User can use lister as `Stream<Item = Result<Entry>>`.
 pub struct Lister {
-    pager: Option<oio::Pager>,
+    acc: FusedAccessor,
+    /// required_metakey is the metakey required by users.
+    required_metakey: FlagSet<Metakey>,
 
     buf: VecDeque<oio::Entry>,
-    /// We will move `pager` inside future and return it back while future is 
ready.
-    /// Thus, we should not allow calling other function while we already have
-    /// a future.
-    #[allow(clippy::type_complexity)]
-    fut: Option<BoxFuture<'static, (oio::Pager, 
Result<Option<Vec<oio::Entry>>>)>>,
+    pager: Option<oio::Pager>,
+    listing: Option<ListFuture>,
+    stating: Option<StatFuture>,
 }
 
 /// # Safety
@@ -54,75 +58,19 @@ unsafe impl Sync for Lister {}
 
 impl Lister {
     /// Create a new lister.
-    pub(crate) fn new(pager: oio::Pager) -> Self {
-        Self {
-            pager: Some(pager),
-            buf: VecDeque::default(),
-            fut: None,
-        }
-    }
-
-    /// has_next can be used to check if there are more pages.
-    pub async fn has_next(&mut self) -> Result<bool> {
-        debug_assert!(
-            self.fut.is_none(),
-            "there are ongoing futures for next page"
-        );
+    pub(crate) async fn create(acc: FusedAccessor, path: &str, args: OpList) 
-> Result<Self> {
+        let required_metakey = args.metakey();
+        let (_, pager) = acc.list(path, args).await?;
 
-        if !self.buf.is_empty() {
-            return Ok(true);
-        }
-
-        let entries = match self
-            .pager
-            .as_mut()
-            .expect("pager must be valid")
-            .next()
-            .await?
-        {
-            // Ideally, the convert from `Vec` to `VecDeque` will not do 
reallocation.
-            //
-            // However, this could be changed as described in [impl<T, A> 
From<Vec<T, A>> for VecDeque<T, 
A>](https://doc.rust-lang.org/std/collections/struct.VecDeque.html#impl-From%3CVec%3CT%2C%20A%3E%3E-for-VecDeque%3CT%2C%20A%3E)
-            Some(entries) => entries.into(),
-            None => return Ok(false),
-        };
-        // Push fetched entries into buffer.
-        self.buf = entries;
-
-        Ok(true)
-    }
+        Ok(Self {
+            acc,
+            required_metakey,
 
-    /// next_page can be used to fetch a new page.
-    ///
-    /// # Notes
-    ///
-    /// Don't mix the usage of `next_page` and `Stream<Item = Result<Entry>>`.
-    /// Always using the same calling style.
-    pub async fn next_page(&mut self) -> Result<Option<Vec<Entry>>> {
-        debug_assert!(
-            self.fut.is_none(),
-            "there are ongoing futures for next page"
-        );
-
-        let entries = if !self.buf.is_empty() {
-            mem::take(&mut self.buf)
-        } else {
-            match self
-                .pager
-                .as_mut()
-                .expect("pager must be valid")
-                .next()
-                .await?
-            {
-                // Ideally, the convert from `Vec` to `VecDeque` will not do 
reallocation.
-                //
-                // However, this could be changed as described in [impl<T, A> 
From<Vec<T, A>> for VecDeque<T, 
A>](https://doc.rust-lang.org/std/collections/struct.VecDeque.html#impl-From%3CVec%3CT%2C%20A%3E%3E-for-VecDeque%3CT%2C%20A%3E)
-                Some(entries) => entries.into(),
-                None => return Ok(None),
-            }
-        };
-
-        Ok(Some(entries.into_iter().map(|v| v.into_entry()).collect()))
+            buf: VecDeque::new(),
+            pager: Some(pager),
+            listing: None,
+            stating: None,
+        })
     }
 }
 
@@ -130,22 +78,44 @@ impl Stream for Lister {
     type Item = Result<Entry>;
 
     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
+        if let Some(fut) = self.stating.as_mut() {
+            let (path, rp) = ready!(fut.poll_unpin(cx));
+            let metadata = rp?.into_metadata();
+
+            self.stating = None;
+            return Poll::Ready(Some(Ok(Entry::new(path, metadata))));
+        }
+
         if let Some(oe) = self.buf.pop_front() {
-            return Poll::Ready(Some(Ok(oe.into_entry())));
+            let (path, metadata) = oe.into_entry().into_parts();
+            // TODO: we can optimize this by checking the provided metakey 
provided by services.
+            if metadata.contains_bit(self.required_metakey) {
+                return Poll::Ready(Some(Ok(Entry::new(path, metadata))));
+            }
+
+            let acc = self.acc.clone();
+            let fut = async move {
+                let path = path;
+                let res = acc.stat(&path, OpStat::default()).await;
+
+                (path, res)
+            };
+            self.stating = Some(Box::pin(fut));
+            return self.poll_next(cx);
         }
 
-        if let Some(fut) = self.fut.as_mut() {
+        if let Some(fut) = self.listing.as_mut() {
             let (op, res) = ready!(fut.poll_unpin(cx));
             self.pager = Some(op);
 
             return match res? {
                 Some(oes) => {
-                    self.fut = None;
+                    self.listing = None;
                     self.buf = oes.into();
                     self.poll_next(cx)
                 }
                 None => {
-                    self.fut = None;
+                    self.listing = None;
                     Poll::Ready(None)
                 }
             };
@@ -157,7 +127,7 @@ impl Stream for Lister {
 
             (pager, res)
         };
-        self.fut = Some(Box::pin(fut));
+        self.listing = Some(Box::pin(fut));
         self.poll_next(cx)
     }
 }
@@ -165,7 +135,7 @@ impl Stream for Lister {
 /// BlockingLister is designed to list entries at given path in a blocking
 /// manner.
 ///
-/// Users can construct Lister by `blocking_list` or `blocking_scan`.
+/// Users can construct Lister by `blocking_lister`.
 pub struct BlockingLister {
     pager: oio::BlockingPager,
     buf: VecDeque<oio::Entry>,
@@ -184,23 +154,6 @@ impl BlockingLister {
             buf: VecDeque::default(),
         }
     }
-
-    /// next_page can be used to fetch a new page.
-    pub fn next_page(&mut self) -> Result<Option<Vec<Entry>>> {
-        let entries = if !self.buf.is_empty() {
-            mem::take(&mut self.buf)
-        } else {
-            match self.pager.next()? {
-                // Ideally, the convert from `Vec` to `VecDeque` will not do 
reallocation.
-                //
-                // However, this could be changed as described in [impl<T, A> 
From<Vec<T, A>> for VecDeque<T, 
A>](https://doc.rust-lang.org/std/collections/struct.VecDeque.html#impl-From%3CVec%3CT%2C%20A%3E%3E-for-VecDeque%3CT%2C%20A%3E)
-                Some(entries) => entries.into(),
-                None => return Ok(None),
-            }
-        };
-
-        Ok(Some(entries.into_iter().map(|v| v.into_entry()).collect()))
-    }
 }
 
 /// TODO: we can implement next_chunk.
diff --git a/core/src/types/metadata.rs b/core/src/types/metadata.rs
index 02f5bb90d..66a7b41d4 100644
--- a/core/src/types/metadata.rs
+++ b/core/src/types/metadata.rs
@@ -85,6 +85,18 @@ impl Metadata {
         self
     }
 
+    /// Check if there metadata already contains given bit.
+    pub(crate) fn contains_bit(&self, bit: impl Into<FlagSet<Metakey>>) -> 
bool {
+        let input_bit = bit.into();
+
+        // If meta already contains complete, we don't need to check.
+        if self.bit.contains(Metakey::Complete) {
+            return true;
+        }
+
+        self.bit.contains(input_bit)
+    }
+
     /// mode represent this entry's mode.
     pub fn mode(&self) -> EntryMode {
         debug_assert!(
@@ -155,6 +167,11 @@ impl Metadata {
     ///
     /// `Content-Length` is defined by [RFC 
7230](https://httpwg.org/specs/rfc7230.html#header.content-length)
     /// Refer to [MDN 
Content-Length](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Length)
 for more information.
+    ///
+    /// # Panics
+    ///
+    /// This value is only available when calling on result of `stat` or 
`list` with
+    /// [`Metakey::ContentLength`], otherwise it will panic.
     pub fn content_length(&self) -> u64 {
         debug_assert!(
             self.bit.contains(Metakey::ContentLength) || 
self.bit.contains(Metakey::Complete),
@@ -189,6 +206,11 @@ impl Metadata {
     /// And removed by [RFC 7231](https://www.rfc-editor.org/rfc/rfc7231).
     ///
     /// OpenDAL will try its best to set this value, but not guarantee this 
value is the md5 of content.
+    ///
+    /// # Panics
+    ///
+    /// This value is only available when calling on result of `stat` or 
`list` with
+    /// [`Metakey::ContentMd5`], otherwise it will panic.
     pub fn content_md5(&self) -> Option<&str> {
         debug_assert!(
             self.bit.contains(Metakey::ContentMd5) || 
self.bit.contains(Metakey::Complete),
@@ -221,6 +243,11 @@ impl Metadata {
     /// Content Type of this entry.
     ///
     /// Content Type is defined by [RFC 
9110](https://httpwg.org/specs/rfc9110.html#field.content-type).
+    ///
+    /// # Panics
+    ///
+    /// This value is only available when calling on result of `stat` or 
`list` with
+    /// [`Metakey::ContentType`], otherwise it will panic.
     pub fn content_type(&self) -> Option<&str> {
         debug_assert!(
             self.bit.contains(Metakey::ContentType) || 
self.bit.contains(Metakey::Complete),
@@ -251,6 +278,11 @@ impl Metadata {
     /// Content Range of this entry.
     ///
     /// Content Range is defined by [RFC 
9110](https://httpwg.org/specs/rfc9110.html#field.content-range).
+    ///
+    /// # Panics
+    ///
+    /// This value is only available when calling on result of `stat` or 
`list` with
+    /// [`Metakey::ContentRange`], otherwise it will panic.
     pub fn content_range(&self) -> Option<BytesContentRange> {
         debug_assert!(
             self.bit.contains(Metakey::ContentRange) || 
self.bit.contains(Metakey::Complete),
@@ -284,6 +316,11 @@ impl Metadata {
     /// Refer to [MDN 
Last-Modified](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Last-Modified)
 for more information.
     ///
     /// OpenDAL parse the raw value into [`DateTime`] for convenient.
+    ///
+    /// # Panics
+    ///
+    /// This value is only available when calling on result of `stat` or 
`list` with
+    /// [`Metakey::LastModified`], otherwise it will panic.
     pub fn last_modified(&self) -> Option<DateTime<Utc>> {
         debug_assert!(
             self.bit.contains(Metakey::LastModified) || 
self.bit.contains(Metakey::Complete),
@@ -324,6 +361,11 @@ impl Metadata {
     /// - `W/"0815"`
     ///
     /// `"` is part of etag.
+    ///
+    /// # Panics
+    ///
+    /// This value is only available when calling on result of `stat` or 
`list` with
+    /// [`Metakey::Etag`], otherwise it will panic.
     pub fn etag(&self) -> Option<&str> {
         debug_assert!(
             self.bit.contains(Metakey::Etag) || 
self.bit.contains(Metakey::Complete),
@@ -378,6 +420,11 @@ impl Metadata {
     /// - "inline"
     /// - "attachment"
     /// - "attachment; filename=\"filename.jpg\""
+    ///
+    /// # Panics
+    ///
+    /// This value is only available when calling on result of `stat` or 
`list` with
+    /// [`Metakey::ContentDisposition`], otherwise it will panic.
     pub fn content_disposition(&self) -> Option<&str> {
         debug_assert!(
             self.bit.contains(Metakey::ContentDisposition) || 
self.bit.contains(Metakey::Complete),
@@ -426,6 +473,11 @@ impl Metadata {
     /// Version is a string that can be used to identify the version of this 
entry.
     ///
     /// This field may come out from the version control system, like object 
versioning in AWS S3.
+    ///
+    /// # Panics
+    ///
+    /// This value is only available when calling on result of `stat` or 
`list` with
+    /// [`Metakey::Version`], otherwise it will panic.
     pub fn version(&self) -> Option<&str> {
         debug_assert!(
             self.bit.contains(Metakey::Version) || 
self.bit.contains(Metakey::Complete),
diff --git a/core/src/types/operator/blocking_operator.rs 
b/core/src/types/operator/blocking_operator.rs
index 2b479624a..5f8c6d279 100644
--- a/core/src/types/operator/blocking_operator.rs
+++ b/core/src/types/operator/blocking_operator.rs
@@ -19,7 +19,6 @@ use std::io::Read;
 use std::ops::RangeBounds;
 
 use bytes::Bytes;
-use flagset::FlagSet;
 
 use super::operator_functions::*;
 use crate::raw::*;
@@ -153,99 +152,6 @@ impl BlockingOperator {
         Ok(meta)
     }
 
-    /// Get current metadata with cache in blocking way.
-    ///
-    /// `metadata` will check the given query with already cached metadata
-    ///  first. And query from storage if not found.
-    ///
-    /// # Notes
-    ///
-    /// Use `metadata` if you are working with entries returned by
-    /// [`Lister`]. It's highly possible that metadata you want
-    /// has already been cached.
-    ///
-    /// You may want to use `stat`, if you:
-    ///
-    /// - Want detect the outside changes of file.
-    /// - Don't want to read from cached file metadata.
-    ///
-    /// # Behavior
-    ///
-    /// Visiting not fetched metadata will lead to panic in debug build.
-    /// It must be a bug, please fix it instead.
-    ///
-    /// # Examples
-    ///
-    /// ## Query already cached metadata
-    ///
-    /// By query metadata with `None`, we can only query in-memory metadata
-    /// cache. In this way, we can make sure that no API call will send.
-    ///
-    /// ```
-    /// # use anyhow::Result;
-    /// # use opendal::BlockingOperator;
-    /// use opendal::Entry;
-    ///
-    /// # fn test(op: BlockingOperator, entry: Entry) -> Result<()> {
-    /// let meta = op.metadata(&entry, None)?;
-    /// // content length COULD be correct.
-    /// let _ = meta.content_length();
-    /// // etag COULD be correct.
-    /// let _ = meta.etag();
-    /// # Ok(())
-    /// # }
-    /// ```
-    ///
-    /// ## Query content length and content type
-    ///
-    /// ```
-    /// # use anyhow::Result;
-    /// # use opendal::BlockingOperator;
-    /// use opendal::Entry;
-    /// use opendal::Metakey;
-    ///
-    /// # fn test(op: BlockingOperator, entry: Entry) -> Result<()> {
-    /// let meta = op.metadata(&entry, { Metakey::ContentLength | 
Metakey::ContentType })?;
-    /// // content length MUST be correct.
-    /// let _ = meta.content_length();
-    /// // etag COULD be correct.
-    /// let _ = meta.etag();
-    /// # Ok(())
-    /// # }
-    /// ```
-    ///
-    /// ## Query all metadata
-    ///
-    /// By query metadata with `Complete`, we can make sure that we have 
fetched all metadata of this entry.
-    ///
-    /// ```
-    /// # use anyhow::Result;
-    /// # use opendal::BlockingOperator;
-    /// use opendal::Entry;
-    /// use opendal::Metakey;
-    ///
-    /// # fn test(op: BlockingOperator, entry: Entry) -> Result<()> {
-    /// let meta = op.metadata(&entry, { Metakey::Complete })?;
-    /// // content length MUST be correct.
-    /// let _ = meta.content_length();
-    /// // etag MUST be correct.
-    /// let _ = meta.etag();
-    /// # Ok(())
-    /// # }
-    /// ```
-    pub fn metadata(&self, entry: &Entry, flags: impl Into<FlagSet<Metakey>>) 
-> Result<Metadata> {
-        // Check if cached metadata saticifies the query.
-        if let Some(meta) = entry.metadata() {
-            if meta.bit().contains(flags) || 
meta.bit().contains(Metakey::Complete) {
-                return Ok(meta.clone());
-            }
-        }
-
-        // Else request from backend..
-        let meta = self.stat(entry.path())?;
-        Ok(meta)
-    }
-
     /// Check if this path exists or not.
     ///
     /// # Example
@@ -824,11 +730,9 @@ impl BlockingOperator {
     /// # use opendal::EntryMode;
     /// # fn test(op: BlockingOperator) -> Result<()> {
     /// let mut ds = op.list("path/to/dir/")?;
-    /// while let Some(mut de) = ds.next() {
-    ///     let meta = op.metadata(&de?, {
-    ///         use opendal::Metakey::*;
-    ///         Mode
-    ///     })?;
+    /// while let Some(mut entry) = ds.next() {
+    ///     let entry = entry?;
+    ///     let meta = entry.metadata();
     ///     match meta.mode() {
     ///         EntryMode::FILE => {
     ///             println!("Handling file")
@@ -879,11 +783,9 @@ impl BlockingOperator {
     /// # use opendal::EntryMode;
     /// # fn test(op: BlockingOperator) -> Result<()> {
     /// let mut ds = op.list("path/to/dir/")?;
-    /// while let Some(mut de) = ds.next() {
-    ///     let meta = op.metadata(&de?, {
-    ///         use opendal::Metakey::*;
-    ///         Mode
-    ///     })?;
+    /// while let Some(mut entry) = ds.next() {
+    ///     let entry = entry?;
+    ///     let meta = entry.metadata();
     ///     match meta.mode() {
     ///         EntryMode::FILE => {
     ///             println!("Handling file")
diff --git a/core/src/types/operator/operator.rs 
b/core/src/types/operator/operator.rs
index f1cdb47ac..92959e726 100644
--- a/core/src/types/operator/operator.rs
+++ b/core/src/types/operator/operator.rs
@@ -19,7 +19,6 @@ use std::ops::RangeBounds;
 use std::time::Duration;
 
 use bytes::Bytes;
-use flagset::FlagSet;
 use futures::stream;
 use futures::AsyncReadExt;
 use futures::Stream;
@@ -159,18 +158,13 @@ impl Operator {
         }
     }
 
-    /// Get current path's metadata **without cache** directly.
+    /// Get current path's metadata.
     ///
     /// # Notes
     ///
-    /// Use `stat` if you:
-    ///
-    /// - Want to detect the outside changes of path.
-    /// - Don't want to read from cached metadata.
-    ///
-    /// You may want to use `metadata` if you are working with entries
-    /// returned by [`Lister`]. It's highly possible that metadata
-    /// you want has already been cached.
+    /// For fetch metadata of entries returned by [`Lister`], it's better to 
use [`list_with`] and
+    /// [`lister_with`] with `metakey` query like `Metakey::ContentLength | 
Metakey::LastModified`
+    /// so that we can avoid extra requests.
     ///
     /// # Examples
     ///
@@ -245,107 +239,6 @@ impl Operator {
         fut
     }
 
-    /// Get current metadata with cache.
-    ///
-    /// `metadata` will check the given query with already cached metadata
-    ///  first. And query from storage if not found.
-    ///
-    /// # Notes
-    ///
-    /// Use `metadata` if you are working with entries returned by
-    /// [`Lister`]. It's highly possible that metadata you want
-    /// has already been cached.
-    ///
-    /// You may want to use `stat`, if you:
-    ///
-    /// - Want to detect the outside changes of path.
-    /// - Don't want to read from cached metadata.
-    ///
-    /// # Behavior
-    ///
-    /// Visiting not fetched metadata will lead to panic in debug build.
-    /// It must be a bug, please fix it instead.
-    ///
-    /// # Examples
-    ///
-    /// ## Query already cached metadata
-    ///
-    /// By querying metadata with `None`, we can only query in-memory metadata
-    /// cache. In this way, we can make sure that no API call will be sent.
-    ///
-    /// ```
-    /// # use anyhow::Result;
-    /// # use opendal::Operator;
-    /// use opendal::Entry;
-    /// # #[tokio::main]
-    /// # async fn test(op: Operator, entry: Entry) -> Result<()> {
-    /// let meta = op.metadata(&entry, None).await?;
-    /// // content length COULD be correct.
-    /// let _ = meta.content_length();
-    /// // etag COULD be correct.
-    /// let _ = meta.etag();
-    /// # Ok(())
-    /// # }
-    /// ```
-    ///
-    /// ## Query content length and content type
-    ///
-    /// ```
-    /// # use anyhow::Result;
-    /// # use opendal::Operator;
-    /// use opendal::Entry;
-    /// use opendal::Metakey;
-    ///
-    /// # #[tokio::main]
-    /// # async fn test(op: Operator, entry: Entry) -> Result<()> {
-    /// let meta = op
-    ///     .metadata(&entry, Metakey::ContentLength | Metakey::ContentType)
-    ///     .await?;
-    /// // content length MUST be correct.
-    /// let _ = meta.content_length();
-    /// // etag COULD be correct.
-    /// let _ = meta.etag();
-    /// # Ok(())
-    /// # }
-    /// ```
-    ///
-    /// ## Query all metadata
-    ///
-    /// By querying metadata with `Complete`, we can make sure that we have 
fetched all metadata of this entry.
-    ///
-    /// ```
-    /// # use anyhow::Result;
-    /// # use opendal::Operator;
-    /// use opendal::Entry;
-    /// use opendal::Metakey;
-    ///
-    /// # #[tokio::main]
-    /// # async fn test(op: Operator, entry: Entry) -> Result<()> {
-    /// let meta = op.metadata(&entry, Metakey::Complete).await?;
-    /// // content length MUST be correct.
-    /// let _ = meta.content_length();
-    /// // etag MUST be correct.
-    /// let _ = meta.etag();
-    /// # Ok(())
-    /// # }
-    /// ```
-    pub async fn metadata(
-        &self,
-        entry: &Entry,
-        flags: impl Into<FlagSet<Metakey>>,
-    ) -> Result<Metadata> {
-        // Check if cached metadata saticifies the query.
-        if let Some(meta) = entry.metadata() {
-            if meta.bit().contains(flags) || 
meta.bit().contains(Metakey::Complete) {
-                return Ok(meta.clone());
-            }
-        }
-
-        // Else request from backend..
-        let meta = self.stat(entry.path()).await?;
-        Ok(meta)
-    }
-
     /// Check if this path exists or not.
     ///
     /// # Example
@@ -1270,13 +1163,13 @@ impl Operator {
     ///
     /// # Notes
     ///
-    /// ## For listing recursively
+    /// ## Listing recursively
     ///
     /// This function only read the children of the given directory. To read
     /// all entries recursively, use 
`Operator::list_with("path").delimiter("")`
     /// instead.
     ///
-    /// ## For streaming
+    /// ## Streaming
     ///
     /// This function will read all entries in the given directory. It could
     /// take very long time and consume a lot of memory if the directory
@@ -1285,6 +1178,11 @@ impl Operator {
     /// In order to avoid this, you can use [`Operator::lister`] to list 
entries in
     /// a streaming way.
     ///
+    /// ## Metadata
+    ///
+    /// The only metadata that is guaranteed to be available is the `Mode`.
+    /// For fetching more metadata, please use [`Operator::list_with`] and 
`metakey`.
+    ///
     /// # Examples
     ///
     /// ```no_run
@@ -1296,13 +1194,12 @@ impl Operator {
     /// # async fn test(op: Operator) -> Result<()> {
     /// let mut entries = op.list("path/to/dir/").await?;
     /// for entry in entries {
-    ///     let meta = op.metadata(&entry, Metakey::Mode).await?;
-    ///     match meta.mode() {
+    ///     match entry.metadata().mode() {
     ///         EntryMode::FILE => {
     ///             println!("Handling file")
     ///         }
     ///         EntryMode::DIR => {
-    ///             println!("Handling dir like start a new list via 
meta.path()")
+    ///             println!("Handling dir {}", entry.path())
     ///         }
     ///         EntryMode::Unknown => continue,
     ///     }
@@ -1327,6 +1224,11 @@ impl Operator {
     /// In order to avoid this, you can use [`Operator::lister`] to list 
entries in
     /// a streaming way.
     ///
+    /// ## Metadata
+    ///
+    /// The only metadata that is guaranteed to be available is the `Mode`.
+    /// For fetching more metadata, please specify the `metakey`.
+    ///
     /// # Examples
     ///
     /// ## List entries with prefix
@@ -1342,8 +1244,7 @@ impl Operator {
     /// # async fn test(op: Operator) -> Result<()> {
     /// let mut entries = op.list_with("prefix/").delimiter("").await?;
     /// for entry in entries {
-    ///     let meta = op.metadata(&entry, Metakey::Mode).await?;
-    ///     match meta.mode() {
+    ///     match entry.metadata().mode() {
     ///         EntryMode::FILE => {
     ///             println!("Handling file")
     ///         }
@@ -1356,6 +1257,39 @@ impl Operator {
     /// # Ok(())
     /// # }
     /// ```
+    ///
+    /// ## List entries with metakey for more metadata
+    ///
+    /// ```no_run
+    /// # use anyhow::Result;
+    /// use opendal::EntryMode;
+    /// use opendal::Metakey;
+    /// use opendal::Operator;
+    /// # #[tokio::main]
+    /// # async fn test(op: Operator) -> Result<()> {
+    /// let mut entries = op
+    ///     .list_with("dir/")
+    ///     .metakey(Metakey::ContentLength | Metakey::LastModified)
+    ///     .await?;
+    /// for entry in entries {
+    ///     let meta = entry.metadata();
+    ///     match meta.mode() {
+    ///         EntryMode::FILE => {
+    ///             println!(
+    ///                 "Handling file {} with size {}",
+    ///                 entry.path(),
+    ///                 meta.content_length()
+    ///             )
+    ///         }
+    ///         EntryMode::DIR => {
+    ///             println!("Handling dir {}", entry.path())
+    ///         }
+    ///         EntryMode::Unknown => continue,
+    ///     }
+    /// }
+    /// # Ok(())
+    /// # }
+    /// ```
     pub fn list_with(&self, path: &str) -> FutureList {
         let path = normalize_path(path);
 
@@ -1375,8 +1309,7 @@ impl Operator {
                         .with_context("path", &path));
                     }
 
-                    let (_, pager) = inner.list(&path, args).await?;
-                    let lister = Lister::new(pager);
+                    let lister = Lister::create(inner, &path, args).await?;
 
                     lister.try_collect().await
                 };
@@ -1392,6 +1325,19 @@ impl Operator {
     ///
     /// An error will be returned if given path doesn't end with `/`.
     ///
+    /// # Notes
+    ///
+    /// ## Listing recursively
+    ///
+    /// This function only read the children of the given directory. To read
+    /// all entries recursively, use [`Operator::lister_with`] and 
`delimiter("")`
+    /// instead.
+    ///
+    /// ## Metadata
+    ///
+    /// The only metadata that is guaranteed to be available is the `Mode`.
+    /// For fetching more metadata, please use [`Operator::lister_with`] and 
`metakey`.
+    ///
     /// # Examples
     ///
     /// ```no_run
@@ -1405,8 +1351,7 @@ impl Operator {
     /// # async fn test(op: Operator) -> Result<()> {
     /// let mut ds = op.lister("path/to/dir/").await?;
     /// while let Some(mut de) = ds.try_next().await? {
-    ///     let meta = op.metadata(&de, Metakey::Mode).await?;
-    ///     match meta.mode() {
+    ///     match de.metadata().mode() {
     ///         EntryMode::FILE => {
     ///             println!("Handling file")
     ///         }
@@ -1447,14 +1392,13 @@ impl Operator {
     ///     .limit(10)
     ///     .start_after("start")
     ///     .await?;
-    /// while let Some(mut de) = ds.try_next().await? {
-    ///     let meta = op.metadata(&de, Metakey::Mode).await?;
-    ///     match meta.mode() {
+    /// while let Some(mut entry) = ds.try_next().await? {
+    ///     match entry.metadata().mode() {
     ///         EntryMode::FILE => {
-    ///             println!("Handling file")
+    ///             println!("Handling file {}", entry.path())
     ///         }
     ///         EntryMode::DIR => {
-    ///             println!("Handling dir like start a new list via 
meta.path()")
+    ///             println!("Handling dir {}", entry.path())
     ///         }
     ///         EntryMode::Unknown => continue,
     ///     }
@@ -1475,14 +1419,48 @@ impl Operator {
     /// # #[tokio::main]
     /// # async fn test(op: Operator) -> Result<()> {
     /// let mut ds = op.lister_with("path/to/dir/").delimiter("").await?;
-    /// while let Some(mut de) = ds.try_next().await? {
-    ///     let meta = op.metadata(&de, Metakey::Mode).await?;
+    /// while let Some(mut entry) = ds.try_next().await? {
+    ///     match entry.metadata().mode() {
+    ///         EntryMode::FILE => {
+    ///             println!("Handling file {}", entry.path())
+    ///         }
+    ///         EntryMode::DIR => {
+    ///             println!("Handling dir {}", entry.path())
+    ///         }
+    ///         EntryMode::Unknown => continue,
+    ///     }
+    /// }
+    /// # Ok(())
+    /// # }
+    /// ```
+    ///
+    /// ## List files with required metadata
+    ///
+    /// ```no_run
+    /// # use anyhow::Result;
+    /// # use futures::io;
+    /// use futures::TryStreamExt;
+    /// use opendal::EntryMode;
+    /// use opendal::Metakey;
+    /// use opendal::Operator;
+    /// # #[tokio::main]
+    /// # async fn test(op: Operator) -> Result<()> {
+    /// let mut ds = op
+    ///     .lister_with("path/to/dir/")
+    ///     .metakey(Metakey::ContentLength | Metakey::LastModified)
+    ///     .await?;
+    /// while let Some(mut entry) = ds.try_next().await? {
+    ///     let meta = entry.metadata();
     ///     match meta.mode() {
     ///         EntryMode::FILE => {
-    ///             println!("Handling file")
+    ///             println!(
+    ///                 "Handling file {} with size {}",
+    ///                 entry.path(),
+    ///                 meta.content_length()
+    ///             )
     ///         }
     ///         EntryMode::DIR => {
-    ///             println!("Handling dir like start a new list via 
meta.path()")
+    ///             println!("Handling dir {}", entry.path())
     ///         }
     ///         EntryMode::Unknown => continue,
     ///     }
@@ -1509,9 +1487,7 @@ impl Operator {
                         .with_context("path", &path));
                     }
 
-                    let (_, pager) = inner.list(&path, args).await?;
-
-                    Ok(Lister::new(pager))
+                    Lister::create(inner, &path, args).await
                 };
                 Box::pin(fut)
             },
diff --git a/core/src/types/operator/operator_futures.rs 
b/core/src/types/operator/operator_futures.rs
index 68e646788..42ec83311 100644
--- a/core/src/types/operator/operator_futures.rs
+++ b/core/src/types/operator/operator_futures.rs
@@ -27,6 +27,7 @@ use std::task::Poll;
 use std::time::Duration;
 
 use bytes::Bytes;
+use flagset::FlagSet;
 use futures::future::BoxFuture;
 use futures::Future;
 use futures::FutureExt;
@@ -582,6 +583,12 @@ impl FutureList {
         self.0 = self.0.map_args(|args| args.with_delimiter(v));
         self
     }
+
+    /// Change the metakey. The default metakey is `Metakey::Mode`.
+    pub fn metakey(mut self, v: impl Into<FlagSet<Metakey>>) -> Self {
+        self.0 = self.0.map_args(|args| args.with_metakey(v));
+        self
+    }
 }
 
 impl Future for FutureList {
@@ -615,6 +622,12 @@ impl FutureLister {
         self.0 = self.0.map_args(|args| args.with_delimiter(v));
         self
     }
+
+    /// Change the metakey. The default metakey is `Metakey::Mode`.
+    pub fn metakey(mut self, v: impl Into<FlagSet<Metakey>>) -> Self {
+        self.0 = self.0.map_args(|args| args.with_metakey(v));
+        self
+    }
 }
 
 impl Future for FutureLister {
diff --git a/core/tests/behavior/fuzz.rs b/core/tests/behavior/fuzz.rs
index 9e504eaff..cca74763b 100644
--- a/core/tests/behavior/fuzz.rs
+++ b/core/tests/behavior/fuzz.rs
@@ -15,8 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::io;
 use std::io::SeekFrom;
-use std::{io, vec};
+use std::vec;
 
 use anyhow::Result;
 use futures::AsyncSeekExt;
diff --git a/core/tests/behavior/list.rs b/core/tests/behavior/list.rs
index ed7067600..79a36fe87 100644
--- a/core/tests/behavior/list.rs
+++ b/core/tests/behavior/list.rs
@@ -37,6 +37,8 @@ pub fn behavior_list_tests(op: &Operator) -> Vec<Trial> {
         op,
         test_check,
         test_list_dir,
+        test_list_dir_with_metakey,
+        test_list_dir_with_metakey_complete,
         test_list_rich_dir,
         test_list_empty_dir,
         test_list_non_exist_dir,
@@ -84,6 +86,95 @@ pub async fn test_list_dir(op: Operator) -> Result<()> {
     Ok(())
 }
 
+/// List dir with metakey
+pub async fn test_list_dir_with_metakey(op: Operator) -> Result<()> {
+    let parent = uuid::Uuid::new_v4().to_string();
+    let path = format!("{parent}/{}", uuid::Uuid::new_v4());
+    debug!("Generate a random file: {}", &path);
+    let (content, size) = gen_bytes();
+
+    op.write(&path, content).await.expect("write must succeed");
+
+    let mut obs = op
+        .lister_with(&format!("{parent}/"))
+        .metakey(
+            Metakey::Mode
+                | Metakey::CacheControl
+                | Metakey::ContentDisposition
+                | Metakey::ContentLength
+                | Metakey::ContentMd5
+                | Metakey::ContentRange
+                | Metakey::ContentType
+                | Metakey::Etag
+                | Metakey::LastModified
+                | Metakey::Version,
+        )
+        .await?;
+    let mut found = false;
+    while let Some(de) = obs.try_next().await? {
+        let meta = de.metadata();
+        if de.path() == path {
+            assert_eq!(meta.mode(), EntryMode::FILE);
+            assert_eq!(meta.content_length(), size as u64);
+
+            // We don't care about the value, we just to check there is no 
panic.
+            let _ = meta.cache_control();
+            let _ = meta.content_disposition();
+            let _ = meta.content_md5();
+            let _ = meta.content_range();
+            let _ = meta.content_type();
+            let _ = meta.etag();
+            let _ = meta.last_modified();
+            let _ = meta.version();
+
+            found = true
+        }
+    }
+    assert!(found, "file should be found in list");
+
+    op.delete(&path).await.expect("delete must succeed");
+    Ok(())
+}
+
+/// List dir with metakey complete
+pub async fn test_list_dir_with_metakey_complete(op: Operator) -> Result<()> {
+    let parent = uuid::Uuid::new_v4().to_string();
+    let path = format!("{parent}/{}", uuid::Uuid::new_v4());
+    debug!("Generate a random file: {}", &path);
+    let (content, size) = gen_bytes();
+
+    op.write(&path, content).await.expect("write must succeed");
+
+    let mut obs = op
+        .lister_with(&format!("{parent}/"))
+        .metakey(Metakey::Complete)
+        .await?;
+    let mut found = false;
+    while let Some(de) = obs.try_next().await? {
+        let meta = de.metadata();
+        if de.path() == path {
+            assert_eq!(meta.mode(), EntryMode::FILE);
+            assert_eq!(meta.content_length(), size as u64);
+
+            // We don't care about the value, we just to check there is no 
panic.
+            let _ = meta.cache_control();
+            let _ = meta.content_disposition();
+            let _ = meta.content_md5();
+            let _ = meta.content_range();
+            let _ = meta.content_type();
+            let _ = meta.etag();
+            let _ = meta.last_modified();
+            let _ = meta.version();
+
+            found = true
+        }
+    }
+    assert!(found, "file should be found in list");
+
+    op.delete(&path).await.expect("delete must succeed");
+    Ok(())
+}
+
 /// listing a directory, which contains more objects than a single page can 
take.
 pub async fn test_list_rich_dir(op: Operator) -> Result<()> {
     op.create_dir("test_list_rich_dir/").await?;

Reply via email to