This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch polish-lister
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit c6123160467564fbe84ec46de3cb8751f11553c2
Author: Xuanwo <[email protected]>
AuthorDate: Wed Nov 15 15:28:13 2023 +0800

    Save work
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/layers/blocking.rs                   |   3 +-
 core/src/layers/complete.rs                   |  11 +--
 core/src/layers/concurrent_limit.rs           |   4 +-
 core/src/layers/error_context.rs              |   4 +-
 core/src/layers/immutable_index.rs            |   6 +-
 core/src/layers/logging.rs                    |  12 +--
 core/src/layers/madsim.rs                     |   8 +-
 core/src/layers/minitrace.rs                  |  12 +--
 core/src/layers/oteltrace.rs                  |   4 +-
 core/src/layers/retry.rs                      |  76 +++++++++------
 core/src/layers/timeout.rs                    |  33 +++++--
 core/src/layers/tracing.rs                    |   4 +-
 core/src/raw/adapters/kv/backend.rs           |  54 ++++++-----
 core/src/raw/adapters/typed_kv/backend.rs     |  53 +++++-----
 core/src/raw/oio/list/api.rs                  |  19 ++--
 core/src/raw/oio/list/into_flat_page.rs       | 106 ++++++++++----------
 core/src/raw/oio/list/into_hierarchy_pager.rs |  26 ++---
 core/src/raw/oio/list/mod.rs                  |   5 +
 core/src/raw/oio/list/page_list.rs            | 121 +++++++++++++++++++++++
 core/src/services/alluxio/backend.rs          |   8 +-
 core/src/services/alluxio/lister.rs           |  28 ++----
 core/src/services/azblob/backend.rs           |   6 +-
 core/src/services/azblob/lister.rs            |  27 ++----
 core/src/services/azdls/backend.rs            |   6 +-
 core/src/services/azdls/lister.rs             |  44 +++------
 core/src/services/azfile/backend.rs           |   6 +-
 core/src/services/azfile/lister.rs            |  49 +++-------
 core/src/services/cos/backend.rs              |   8 +-
 core/src/services/cos/lister.rs               |  31 ++----
 core/src/services/dbfs/backend.rs             |   6 +-
 core/src/services/dbfs/lister.rs              |  28 ++----
 core/src/services/fs/lister.rs                |  61 ++++++++----
 core/src/services/ftp/lister.rs               |  49 +++++-----
 core/src/services/gcs/backend.rs              |  21 ++--
 core/src/services/gcs/lister.rs               |  34 +++----
 core/src/services/gdrive/backend.rs           |   8 +-
 core/src/services/gdrive/core.rs              |  25 ++---
 core/src/services/gdrive/lister.rs            |  72 +++++---------
 core/src/services/hdfs/lister.rs              |  49 +++++-----
 core/src/services/ipfs/backend.rs             |  26 ++---
 core/src/services/ipmfs/backend.rs            |   8 +-
 core/src/services/ipmfs/lister.rs             |  53 ++++------
 core/src/services/obs/backend.rs              |   8 +-
 core/src/services/obs/lister.rs               |  27 ++----
 core/src/services/onedrive/backend.rs         |   7 +-
 core/src/services/onedrive/lister.rs          | 135 +++++++++++---------------
 core/src/services/oss/backend.rs              |  20 ++--
 core/src/services/oss/core.rs                 |  14 +--
 core/src/services/oss/lister.rs               |  34 +++----
 core/src/services/s3/backend.rs               |  20 ++--
 core/src/services/s3/lister.rs                |  35 +++----
 core/src/services/sftp/lister.rs              |  19 ++--
 core/src/services/swift/backend.rs            |   6 +-
 core/src/services/swift/lister.rs             |  22 ++---
 core/src/services/webdav/backend.rs           |   9 +-
 core/src/services/webdav/lister.rs            |  32 +++---
 core/src/services/webhdfs/backend.rs          |  75 +++-----------
 core/src/services/webhdfs/lister.rs           | 110 +++++++++------------
 core/src/types/list.rs                        |  72 ++++++--------
 59 files changed, 872 insertions(+), 987 deletions(-)

diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs
index 67281ba25..3181062e7 100644
--- a/core/src/layers/blocking.rs
+++ b/core/src/layers/blocking.rs
@@ -315,7 +315,8 @@ impl<I: oio::Write + 'static> oio::BlockingWrite for 
BlockingWrapper<I> {
 
 impl<I: oio::List> oio::BlockingList for BlockingWrapper<I> {
     fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        self.handle.block_on(self.inner.next())
+        todo!()
+        // self.handle.block_on(poll_fn(|cx| self.inner.poll_next(cx)))
     }
 }
 
diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index 7fd2037dc..ea4906cf6 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -27,7 +27,6 @@ use std::task::Poll;
 use async_trait::async_trait;
 use bytes::Bytes;
 
-use crate::raw::oio::Entry;
 use crate::raw::oio::FlatLister;
 use crate::raw::oio::HierarchyLister;
 use crate::raw::oio::RangeReader;
@@ -639,13 +638,13 @@ where
     A: Accessor<Lister = P>,
     P: oio::List,
 {
-    async fn next(&mut self) -> Result<Option<Vec<Entry>>> {
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Result<Option<oio::Entry>>> {
         use CompleteLister::*;
 
         match self {
-            AlreadyComplete(p) => p.next().await,
-            NeedFlat(p) => p.next().await,
-            NeedHierarchy(p) => p.next().await,
+            AlreadyComplete(p) => p.poll_next(cx),
+            NeedFlat(p) => p.poll_next(cx),
+            NeedHierarchy(p) => p.poll_next(cx),
         }
     }
 }
@@ -655,7 +654,7 @@ where
     A: Accessor<BlockingLister = P>,
     P: oio::BlockingList,
 {
-    fn next(&mut self) -> Result<Option<Vec<Entry>>> {
+    fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
         use CompleteLister::*;
 
         match self {
diff --git a/core/src/layers/concurrent_limit.rs 
b/core/src/layers/concurrent_limit.rs
index 00f9a9a8c..d839b97d9 100644
--- a/core/src/layers/concurrent_limit.rs
+++ b/core/src/layers/concurrent_limit.rs
@@ -310,8 +310,8 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for 
ConcurrentLimitWrapper<R> {
 
 #[async_trait]
 impl<R: oio::List> oio::List for ConcurrentLimitWrapper<R> {
-    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        self.inner.next().await
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Result<Option<oio::Entry>>> {
+        self.inner.poll_next(cx)
     }
 }
 
diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs
index 3278d502c..c3a0feef8 100644
--- a/core/src/layers/error_context.rs
+++ b/core/src/layers/error_context.rs
@@ -453,8 +453,8 @@ impl<T: oio::BlockingWrite> oio::BlockingWrite for 
ErrorContextWrapper<T> {
 
 #[async_trait::async_trait]
 impl<T: oio::List> oio::List for ErrorContextWrapper<T> {
-    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        self.inner.next().await.map_err(|err| {
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Result<Option<oio::Entry>>> {
+        self.inner.poll_next(cx).map_err(|err| {
             err.with_operation(ListOperation::Next)
                 .with_context("service", self.scheme)
                 .with_context("path", &self.path)
diff --git a/core/src/layers/immutable_index.rs 
b/core/src/layers/immutable_index.rs
index 6137687b8..9d47d78ff 100644
--- a/core/src/layers/immutable_index.rs
+++ b/core/src/layers/immutable_index.rs
@@ -18,6 +18,7 @@
 use std::collections::HashSet;
 use std::fmt::Debug;
 use std::mem;
+use std::task::{Context, Poll};
 
 use async_trait::async_trait;
 
@@ -240,8 +241,9 @@ impl ImmutableDir {
 
 #[async_trait]
 impl oio::List for ImmutableDir {
-    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        Ok(self.inner_next_page())
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Result<Option<oio::Entry>>> {
+        todo!()
+        // Ok(self.inner_next_page())
     }
 }
 
diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs
index b556cc7aa..16b91cef5 100644
--- a/core/src/layers/logging.rs
+++ b/core/src/layers/logging.rs
@@ -1476,18 +1476,18 @@ impl<P> Drop for LoggingLister<P> {
 
 #[async_trait]
 impl<P: oio::List> oio::List for LoggingLister<P> {
-    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        let res = self.inner.next().await;
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Result<Option<oio::Entry>>> {
+        let res = ready!(self.inner.poll_next(cx));
 
         match &res {
-            Ok(Some(des)) => {
+            Ok(Some(de)) => {
                 debug!(
                     target: LOGGING_TARGET,
-                    "service={} operation={} path={} -> listed {} entries",
+                    "service={} operation={} path={} -> listed entry: {}",
                     self.ctx.scheme,
                     self.op,
                     self.path,
-                    des.len(),
+                    de.path(),
                 );
             }
             Ok(None) => {
@@ -1515,7 +1515,7 @@ impl<P: oio::List> oio::List for LoggingLister<P> {
             }
         };
 
-        res
+        Poll::Ready(res)
     }
 }
 
diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs
index 9eb75da2d..1b5b602aa 100644
--- a/core/src/layers/madsim.rs
+++ b/core/src/layers/madsim.rs
@@ -39,8 +39,6 @@ use madsim::net::Endpoint;
 #[cfg(madsim)]
 use madsim::net::Payload;
 
-use crate::raw::oio;
-use crate::raw::oio::Entry;
 use crate::raw::*;
 use crate::*;
 
@@ -335,11 +333,11 @@ pub struct MadsimLister {}
 
 #[async_trait]
 impl oio::List for MadsimLister {
-    async fn next(&mut self) -> crate::Result<Option<Vec<Entry>>> {
-        Err(Error::new(
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<crate::Result<Option<oio::Entry>>> {
+        Poll::Ready(Err(Error::new(
             ErrorKind::Unsupported,
             "will be supported in the future",
-        ))
+        )))
     }
 }
 
diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs
index 26df33004..b0f192ed3 100644
--- a/core/src/layers/minitrace.rs
+++ b/core/src/layers/minitrace.rs
@@ -372,14 +372,10 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for 
MinitraceWrapper<R> {
 
 #[async_trait]
 impl<R: oio::List> oio::List for MinitraceWrapper<R> {
-    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        self.inner
-            .next()
-            .in_span(Span::enter_with_parent(
-                ListOperation::Next.into_static(),
-                &self.span,
-            ))
-            .await
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Result<Option<oio::Entry>>> {
+        let _g = self.span.set_local_parent();
+        let _span = 
LocalSpan::enter_with_local_parent(ListOperation::Next.into_static());
+        self.inner.poll_next(cx)
     }
 }
 
diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs
index 0244e3190..c4d8b6f0e 100644
--- a/core/src/layers/oteltrace.rs
+++ b/core/src/layers/oteltrace.rs
@@ -331,8 +331,8 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for 
OtelTraceWrapper<R> {
 
 #[async_trait]
 impl<R: oio::List> oio::List for OtelTraceWrapper<R> {
-    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        self.inner.next().await
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Result<Option<oio::Entry>>> {
+        self.inner.poll_next(cx)
     }
 }
 
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index d0162ccdf..67528347d 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -1051,28 +1051,48 @@ impl<R: oio::BlockingWrite, I: RetryInterceptor> 
oio::BlockingWrite for RetryWra
 
 #[async_trait]
 impl<P: oio::List, I: RetryInterceptor> oio::List for RetryWrapper<P, I> {
-    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        let mut backoff = self.builder.build();
-
-        loop {
-            match self.inner.next().await {
-                Ok(v) => return Ok(v),
-                Err(e) if !e.is_temporary() => return Err(e),
-                Err(e) => match backoff.next() {
-                    None => return Err(e),
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Result<Option<oio::Entry>>> {
+        if let Some(sleep) = self.sleep.as_mut() {
+            ready!(sleep.poll_unpin(cx));
+            self.sleep = None;
+        }
+
+        match ready!(self.inner.poll_next(cx)) {
+            Ok(v) => {
+                self.current_backoff = None;
+                Poll::Ready(Ok(v))
+            }
+            Err(err) if !err.is_temporary() => {
+                self.current_backoff = None;
+                Poll::Ready(Err(err))
+            }
+            Err(err) => {
+                let backoff = match self.current_backoff.as_mut() {
+                    Some(backoff) => backoff,
+                    None => {
+                        self.current_backoff = Some(self.builder.build());
+                        self.current_backoff.as_mut().unwrap()
+                    }
+                };
+
+                match backoff.next() {
+                    None => {
+                        self.current_backoff = None;
+                        Poll::Ready(Err(err))
+                    }
                     Some(dur) => {
                         self.notify.intercept(
-                            &e,
+                            &err,
                             dur,
                             &[
                                 ("operation", 
ListOperation::Next.into_static()),
                                 ("path", &self.path),
                             ],
                         );
-                        tokio::time::sleep(dur).await;
-                        continue;
+                        self.sleep = Some(Box::pin(tokio::time::sleep(dur)));
+                        self.poll_next(cx)
                     }
-                },
+                }
             }
         }
     }
@@ -1296,37 +1316,33 @@ mod tests {
     }
     #[async_trait]
     impl oio::List for MockLister {
-        async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
+        fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Result<Option<oio::Entry>>> {
             self.attempt += 1;
-            match self.attempt {
+            let result = match self.attempt {
                 1 => Err(Error::new(
                     ErrorKind::RateLimited,
                     "retryable rate limited error from lister",
                 )
                 .set_temporary()),
-                2 => {
-                    let entries = vec![
-                        oio::Entry::new("hello", 
Metadata::new(EntryMode::FILE)),
-                        oio::Entry::new("world", 
Metadata::new(EntryMode::FILE)),
-                    ];
-                    Ok(Some(entries))
-                }
+                2 => Ok(Some(oio::Entry::new(
+                    "hello",
+                    Metadata::new(EntryMode::FILE),
+                ))),
                 3 => Err(
                     Error::new(ErrorKind::Unexpected, "retryable internal 
server error")
                         .set_temporary(),
                 ),
-                4 => {
-                    let entries = vec![
-                        oio::Entry::new("2023/", 
Metadata::new(EntryMode::DIR)),
-                        oio::Entry::new("0208/", 
Metadata::new(EntryMode::DIR)),
-                    ];
-                    Ok(Some(entries))
-                }
+                4 => Ok(Some(oio::Entry::new(
+                    "2023/",
+                    Metadata::new(EntryMode::DIR),
+                ))),
                 5 => Ok(None),
                 _ => {
                     unreachable!()
                 }
-            }
+            };
+
+            Poll::Ready(result)
         }
     }
 
diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs
index 64b55ff18..8f04c3363 100644
--- a/core/src/layers/timeout.rs
+++ b/core/src/layers/timeout.rs
@@ -417,14 +417,33 @@ impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
 
 #[async_trait]
 impl<R: oio::List> oio::List for TimeoutWrapper<R> {
-    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        tokio::time::timeout(self.timeout, self.inner.next())
-            .await
-            .map_err(|_| {
-                Error::new(ErrorKind::Unexpected, "operation timeout")
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Result<Option<oio::Entry>>> {
+        match self.start {
+            Some(start) => {
+                if start.elapsed() > self.timeout {
+                    // Clean up the start time before return ready.
+                    self.start = None;
+
+                    return Poll::Ready(Err(Error::new(
+                        ErrorKind::Unexpected,
+                        "operation timeout",
+                    )
                     .with_operation(ListOperation::Next)
                     .with_context("timeout", 
self.timeout.as_secs_f64().to_string())
-                    .set_temporary()
-            })?
+                    .set_temporary()));
+                }
+            }
+            None => {
+                self.start = Some(Instant::now());
+            }
+        }
+
+        match self.inner.poll_next(cx) {
+            Poll::Pending => Poll::Pending,
+            Poll::Ready(v) => {
+                self.start = None;
+                Poll::Ready(v)
+            }
+        }
     }
 }
diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs
index 418be8235..38ae5f2ad 100644
--- a/core/src/layers/tracing.rs
+++ b/core/src/layers/tracing.rs
@@ -366,8 +366,8 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for 
TracingWrapper<R> {
 #[async_trait]
 impl<R: oio::List> oio::List for TracingWrapper<R> {
     #[tracing::instrument(parent = &self.span, level = "debug", skip_all)]
-    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        self.inner.next().await
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Result<Option<oio::Entry>>> {
+        self.inner.poll_next(cx)
     }
 }
 
diff --git a/core/src/raw/adapters/kv/backend.rs 
b/core/src/raw/adapters/kv/backend.rs
index 4360e6f39..b5ddf9440 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -19,6 +19,7 @@ use std::sync::Arc;
 use std::task::ready;
 use std::task::Context;
 use std::task::Poll;
+use std::vec::IntoIter;
 
 use async_trait::async_trait;
 use bytes::Bytes;
@@ -303,47 +304,52 @@ where
 
 pub struct KvLister {
     root: String,
-    inner: Option<Vec<String>>,
+    inner: IntoIter<String>,
 }
 
 impl KvLister {
     fn new(root: &str, inner: Vec<String>) -> Self {
         Self {
             root: root.to_string(),
-            inner: Some(inner),
+            inner: inner.into_iter(),
         }
     }
-
-    fn inner_next_page(&mut self) -> Option<Vec<oio::Entry>> {
-        let res = self
-            .inner
-            .take()?
-            .into_iter()
-            .map(|v| {
-                let mode = if v.ends_with('/') {
-                    EntryMode::DIR
-                } else {
-                    EntryMode::FILE
-                };
-
-                oio::Entry::new(&build_rel_path(&self.root, &v), 
Metadata::new(mode))
-            })
-            .collect();
-
-        Some(res)
-    }
 }
 
 #[async_trait]
 impl oio::List for KvLister {
-    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        Ok(self.inner_next_page())
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Result<Option<oio::Entry>>> {
+        let entry = self.inner.next().map(|v| {
+            let mode = if v.ends_with('/') {
+                EntryMode::DIR
+            } else {
+                EntryMode::FILE
+            };
+
+            oio::Entry::new(&build_rel_path(&self.root, &v), 
Metadata::new(mode))
+        });
+
+        Poll::Ready(Ok(entry))
     }
 }
 
 impl oio::BlockingList for KvLister {
     fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        Ok(self.inner_next_page())
+        Ok(Some(
+            self.inner
+                .as_slice()
+                .iter()
+                .map(|v| {
+                    let mode = if v.ends_with('/') {
+                        EntryMode::DIR
+                    } else {
+                        EntryMode::FILE
+                    };
+
+                    oio::Entry::new(&build_rel_path(&self.root, &v), 
Metadata::new(mode))
+                })
+                .collect(),
+        ))
     }
 }
 
diff --git a/core/src/raw/adapters/typed_kv/backend.rs 
b/core/src/raw/adapters/typed_kv/backend.rs
index 2a1da5966..e60e21649 100644
--- a/core/src/raw/adapters/typed_kv/backend.rs
+++ b/core/src/raw/adapters/typed_kv/backend.rs
@@ -19,6 +19,7 @@ use std::sync::Arc;
 use std::task::ready;
 use std::task::Context;
 use std::task::Poll;
+use std::vec::IntoIter;
 
 use async_trait::async_trait;
 use bytes::Bytes;
@@ -304,47 +305,51 @@ where
 
 pub struct KvLister {
     root: String,
-    inner: Option<Vec<String>>,
+    inner: IntoIter<String>,
 }
 
 impl KvLister {
     fn new(root: &str, inner: Vec<String>) -> Self {
         Self {
             root: root.to_string(),
-            inner: Some(inner),
+            inner: inner.into_iter(),
         }
     }
-
-    fn inner_next_page(&mut self) -> Option<Vec<oio::Entry>> {
-        let res = self
-            .inner
-            .take()?
-            .into_iter()
-            .map(|v| {
-                let mode = if v.ends_with('/') {
-                    EntryMode::DIR
-                } else {
-                    EntryMode::FILE
-                };
-
-                oio::Entry::new(&build_rel_path(&self.root, &v), 
Metadata::new(mode))
-            })
-            .collect();
-
-        Some(res)
-    }
 }
 
 #[async_trait]
 impl oio::List for KvLister {
-    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        Ok(self.inner_next_page())
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Result<Option<oio::Entry>>> {
+        let entry = self.inner.next().map(|v| {
+            let mode = if v.ends_with('/') {
+                EntryMode::DIR
+            } else {
+                EntryMode::FILE
+            };
+
+            oio::Entry::new(&build_rel_path(&self.root, &v), 
Metadata::new(mode))
+        });
+
+        Poll::Ready(Ok(entry))
     }
 }
 
 impl oio::BlockingList for KvLister {
     fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        Ok(self.inner_next_page())
+        Ok(Some(
+            self.inner
+                .clone()
+                .map(|v| {
+                    let mode = if v.ends_with('/') {
+                        EntryMode::DIR
+                    } else {
+                        EntryMode::FILE
+                    };
+
+                    oio::Entry::new(&build_rel_path(&self.root, &v), 
Metadata::new(mode))
+                })
+                .collect(),
+        ))
     }
 }
 
diff --git a/core/src/raw/oio/list/api.rs b/core/src/raw/oio/list/api.rs
index c08f5d294..b7899e60d 100644
--- a/core/src/raw/oio/list/api.rs
+++ b/core/src/raw/oio/list/api.rs
@@ -17,6 +17,7 @@
 
 use std::fmt::Display;
 use std::fmt::Formatter;
+use std::task::{Context, Poll};
 
 use async_trait::async_trait;
 
@@ -27,7 +28,7 @@ use crate::*;
 #[derive(Debug, Copy, Clone, Hash, Eq, PartialEq)]
 #[non_exhaustive]
 pub enum ListOperation {
-    /// Operation for [`List::next`]
+    /// Operation for [`List::poll_next`]
     Next,
     /// Operation for [`BlockingList::next`]
     BlockingNext,
@@ -64,7 +65,7 @@ pub trait List: Send + Sync + 'static {
     ///
     /// `Ok(None)` means all pages have been returned. Any following call
     /// to `next` will always get the same result.
-    async fn next(&mut self) -> Result<Option<Vec<Entry>>>;
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Result<Option<Entry>>>;
 }
 
 /// The boxed version of [`List`]
@@ -72,24 +73,24 @@ pub type Lister = Box<dyn List>;
 
 #[async_trait]
 impl<P: List + ?Sized> List for Box<P> {
-    async fn next(&mut self) -> Result<Option<Vec<Entry>>> {
-        (**self).next().await
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Result<Option<Entry>>> {
+        (**self).poll_next(cx)
     }
 }
 
 #[async_trait]
 impl List for () {
-    async fn next(&mut self) -> Result<Option<Vec<Entry>>> {
-        Ok(None)
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Result<Option<Entry>>> {
+        Poll::Ready(Ok(None))
     }
 }
 
 #[async_trait]
 impl<P: List> List for Option<P> {
-    async fn next(&mut self) -> Result<Option<Vec<Entry>>> {
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Result<Option<Entry>>> {
         match self {
-            Some(p) => p.next().await,
-            None => Ok(None),
+            Some(p) => p.poll_next(cx),
+            None => Poll::Ready(Ok(None)),
         }
     }
 }
diff --git a/core/src/raw/oio/list/into_flat_page.rs 
b/core/src/raw/oio/list/into_flat_page.rs
index ee9966b4c..6c2ef6207 100644
--- a/core/src/raw/oio/list/into_flat_page.rs
+++ b/core/src/raw/oio/list/into_flat_page.rs
@@ -17,6 +17,7 @@
 
 use std::collections::VecDeque;
 use std::mem;
+use std::task::{Context, Poll};
 
 use async_trait::async_trait;
 
@@ -95,58 +96,59 @@ where
     A: Accessor<Lister = P>,
     P: oio::List,
 {
-    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        loop {
-            if let Some(de) = self.dirs.pop_back() {
-                let (_, op) = self.acc.list(de.path(), OpList::new()).await?;
-                self.listers.push((op, de, vec![]))
-            }
-
-            let (mut lister, de, mut buf) = match self.listers.pop() {
-                Some((lister, de, buf)) => (lister, de, buf),
-                None => {
-                    if !self.res.is_empty() {
-                        return Ok(Some(mem::take(&mut self.res)));
-                    }
-                    return Ok(None);
-                }
-            };
-
-            if buf.is_empty() {
-                match lister.next().await? {
-                    Some(v) => {
-                        buf = v;
-                    }
-                    None => {
-                        // Only push entry if it's not root dir
-                        if de.path() != self.root {
-                            self.res.push(de);
-                        }
-                        continue;
-                    }
-                }
-            }
-
-            let mut buf = VecDeque::from(buf);
-            loop {
-                if let Some(oe) = buf.pop_front() {
-                    if oe.mode().is_dir() {
-                        self.dirs.push_back(oe);
-                        self.listers.push((lister, de, buf.into()));
-                        break;
-                    } else {
-                        self.res.push(oe)
-                    }
-                } else {
-                    self.listers.push((lister, de, vec![]));
-                    break;
-                }
-            }
-
-            if self.res.len() >= self.size {
-                return Ok(Some(mem::take(&mut self.res)));
-            }
-        }
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Result<Option<oio::Entry>>> {
+        todo!()
+        // loop {
+        //     if let Some(de) = self.dirs.pop_back() {
+        //         let (_, op) = self.acc.list(de.path(), 
OpList::new()).await?;
+        //         self.listers.push((op, de, vec![]))
+        //     }
+        //
+        //     let (mut lister, de, mut buf) = match self.listers.pop() {
+        //         Some((lister, de, buf)) => (lister, de, buf),
+        //         None => {
+        //             if !self.res.is_empty() {
+        //                 return Ok(Some(mem::take(&mut self.res)));
+        //             }
+        //             return Ok(None);
+        //         }
+        //     };
+        //
+        //     if buf.is_empty() {
+        //         match lister.poll_next(cx)? {
+        //             Some(v) => {
+        //                 buf = v;
+        //             }
+        //             None => {
+        //                 // Only push entry if it's not root dir
+        //                 if de.path() != self.root {
+        //                     self.res.push(de);
+        //                 }
+        //                 continue;
+        //             }
+        //         }
+        //     }
+        //
+        //     let mut buf = VecDeque::from(buf);
+        //     loop {
+        //         if let Some(oe) = buf.pop_front() {
+        //             if oe.mode().is_dir() {
+        //                 self.dirs.push_back(oe);
+        //                 self.listers.push((lister, de, buf.into()));
+        //                 break;
+        //             } else {
+        //                 self.res.push(oe)
+        //             }
+        //         } else {
+        //             self.listers.push((lister, de, vec![]));
+        //             break;
+        //         }
+        //     }
+        //
+        //     if self.res.len() >= self.size {
+        //         return Ok(Some(mem::take(&mut self.res)));
+        //     }
+        // }
     }
 }
 
diff --git a/core/src/raw/oio/list/into_hierarchy_pager.rs 
b/core/src/raw/oio/list/into_hierarchy_pager.rs
index 2971ad2a1..217371b45 100644
--- a/core/src/raw/oio/list/into_hierarchy_pager.rs
+++ b/core/src/raw/oio/list/into_hierarchy_pager.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use std::collections::HashSet;
+use std::task::{Context, Poll};
 
 use async_trait::async_trait;
 
@@ -118,18 +119,19 @@ impl<P> HierarchyLister<P> {
 
 #[async_trait]
 impl<P: oio::List> oio::List for HierarchyLister<P> {
-    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        let page = self.lister.next().await?;
-
-        let entries = if let Some(entries) = page {
-            entries
-        } else {
-            return Ok(None);
-        };
-
-        let entries = self.filter_entries(entries);
-
-        Ok(Some(entries))
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Result<Option<oio::Entry>>> {
+        todo!()
+        // let page = self.lister.poll_next(cx)?;
+        //
+        // let entries = if let Some(entries) = page {
+        //     entries
+        // } else {
+        //     return Ok(None);
+        // };
+        //
+        // let entries = self.filter_entries(entries);
+        //
+        // Ok(Some(entries))
     }
 }
 
diff --git a/core/src/raw/oio/list/mod.rs b/core/src/raw/oio/list/mod.rs
index 0a4b0e39e..63fe41292 100644
--- a/core/src/raw/oio/list/mod.rs
+++ b/core/src/raw/oio/list/mod.rs
@@ -22,6 +22,11 @@ pub use api::List;
 pub use api::ListOperation;
 pub use api::Lister;
 
+mod page_list;
+pub use page_list::PageContext;
+pub use page_list::PageList;
+pub use page_list::PageLister;
+
 mod into_flat_page;
 pub use into_flat_page::into_flat_page;
 pub use into_flat_page::FlatLister;
diff --git a/core/src/raw/oio/list/page_list.rs 
b/core/src/raw/oio/list/page_list.rs
new file mode 100644
index 000000000..ed14a57d8
--- /dev/null
+++ b/core/src/raw/oio/list/page_list.rs
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use async_trait::async_trait;
+use futures::future::BoxFuture;
+use std::collections::VecDeque;
+use std::task::{ready, Context, Poll};
+
+use crate::raw::*;
+use crate::*;
+
+/// PageList is used to implement [`List`] based on API supporting pagination. 
By implementing
+/// PageList, services don't need to care about the details of page list.
+///
+/// # Architecture
+///
+/// The architecture after adopting [`PageList`]:
+///
+/// - Services impl `PageList`
+/// - `PageLister` impl `List`
+/// - Expose `PageLister` as `Accessor::Lister`
+#[async_trait]
+pub trait PageList: Send + Sync + Unpin + 'static {
+    /// next_page is used to fetch next page of entries from underlying 
storage.
+    async fn next_page(&self, ctx: &mut PageContext) -> Result<()>;
+}
+
+pub struct PageContext {
+    /// done is used to indicate whether the list operation is done.
+    pub done: bool,
+    /// token is used by underlying storage services to fetch next page.
+    pub token: String,
+    /// entries is used to store entries fetched from underlying storage.
+    ///
+    /// Please always reuse the same `VecDeque` to avoid unnecessary memory 
allocation.
+    /// PageLister makes sure that entries is reset before calling 
`next_page`. Implementor
+    /// can calling `push_back` on `entries` directly.
+    pub entries: VecDeque<oio::Entry>,
+}
+
+pub struct PageLister<L: PageList> {
+    state: State<L>,
+}
+
+enum State<L> {
+    Idle(Option<(L, PageContext)>),
+    Fetch(BoxFuture<'static, ((L, PageContext), Result<()>)>),
+}
+
+/// # Safety
+///
+/// We will only take `&mut Self` reference for State.
+unsafe impl<L: PageList> Sync for State<L> {}
+
+impl<L> PageLister<L>
+where
+    L: PageList,
+{
+    /// Create a new PageLister.
+    pub fn new(l: L) -> Self {
+        Self {
+            state: State::Idle(Some((
+                l,
+                PageContext {
+                    done: false,
+                    token: "".to_string(),
+                    entries: VecDeque::new(),
+                },
+            ))),
+        }
+    }
+}
+
+impl<L> oio::List for PageLister<L>
+where
+    L: PageList,
+{
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Result<Option<oio::Entry>>> {
+        loop {
+            match &mut self.state {
+                State::Idle(st) => {
+                    if let Some((_, ctx)) = st.as_mut() {
+                        if let Some(entry) = ctx.entries.pop_front() {
+                            return Poll::Ready(Ok(Some(entry)));
+                        }
+                        if ctx.done {
+                            return Poll::Ready(Ok(None));
+                        }
+                    }
+
+                    let (l, mut ctx) = st.take().expect("lister must be 
valid");
+                    let fut = async move {
+                        let res = l.next_page(&mut ctx).await;
+                        ((l, ctx), res)
+                    };
+                    self.state = State::Fetch(Box::pin(fut));
+                }
+                State::Fetch(fut) => {
+                    let ((l, ctx), res) = ready!(fut.as_mut().poll(cx));
+                    self.state = State::Idle(Some((l, ctx)));
+
+                    res?;
+                }
+            }
+        }
+    }
+}
diff --git a/core/src/services/alluxio/backend.rs 
b/core/src/services/alluxio/backend.rs
index d5c9b3a12..64db4461f 100644
--- a/core/src/services/alluxio/backend.rs
+++ b/core/src/services/alluxio/backend.rs
@@ -185,7 +185,7 @@ impl Accessor for AlluxioBackend {
     type BlockingReader = ();
     type Writer = AlluxioWriters;
     type BlockingWriter = ();
-    type Lister = AlluxioLister;
+    type Lister = oio::PageLister<AlluxioLister>;
     type BlockingLister = ();
 
     fn info(&self) -> AccessorInfo {
@@ -253,10 +253,8 @@ impl Accessor for AlluxioBackend {
     }
 
     async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, 
Self::Lister)> {
-        Ok((
-            RpList::default(),
-            AlluxioLister::new(self.core.clone(), path),
-        ))
+        let l = AlluxioLister::new(self.core.clone(), path);
+        Ok((RpList::default(), oio::PageLister::new(l)))
     }
 }
 
diff --git a/core/src/services/alluxio/lister.rs 
b/core/src/services/alluxio/lister.rs
index c1efa6b3a..5e56dbff8 100644
--- a/core/src/services/alluxio/lister.rs
+++ b/core/src/services/alluxio/lister.rs
@@ -29,8 +29,6 @@ pub struct AlluxioLister {
     core: Arc<AlluxioCore>,
 
     path: String,
-
-    done: bool,
 }
 
 impl AlluxioLister {
@@ -38,23 +36,19 @@ impl AlluxioLister {
         AlluxioLister {
             core,
             path: path.to_string(),
-            done: false,
         }
     }
 }
 
 #[async_trait]
-impl oio::List for AlluxioLister {
-    async fn next(&mut self) -> Result<Option<Vec<Entry>>> {
-        if self.done {
-            return Ok(None);
-        }
-
+impl oio::PageList for AlluxioLister {
+    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
         let result = self.core.list_status(&self.path).await;
 
         match result {
             Ok(file_infos) => {
-                let mut entries = vec![];
+                ctx.done = true;
+
                 for file_info in file_infos {
                     let path: String = file_info.path.clone();
                     let path = if file_info.folder {
@@ -62,24 +56,18 @@ impl oio::List for AlluxioLister {
                     } else {
                         path
                     };
-                    entries.push(Entry::new(
+                    ctx.entries.push_back(Entry::new(
                         &build_rel_path(&self.core.root, &path),
                         file_info.try_into()?,
                     ));
                 }
 
-                if entries.is_empty() {
-                    return Ok(None);
-                }
-
-                self.done = true;
-
-                Ok(Some(entries))
+                Ok(())
             }
             Err(e) => {
                 if e.kind() == ErrorKind::NotFound {
-                    self.done = true;
-                    return Ok(None);
+                    ctx.done = true;
+                    return Ok(());
                 }
                 Err(e)
             }
diff --git a/core/src/services/azblob/backend.rs 
b/core/src/services/azblob/backend.rs
index a87907477..b3e8caad9 100644
--- a/core/src/services/azblob/backend.rs
+++ b/core/src/services/azblob/backend.rs
@@ -546,7 +546,7 @@ impl Accessor for AzblobBackend {
     type BlockingReader = ();
     type Writer = AzblobWriters;
     type BlockingWriter = ();
-    type Lister = AzblobLister;
+    type Lister = oio::PageLister<AzblobLister>;
     type BlockingLister = ();
 
     fn info(&self) -> AccessorInfo {
@@ -689,14 +689,14 @@ impl Accessor for AzblobBackend {
     }
 
     async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Lister)> {
-        let op = AzblobLister::new(
+        let l = AzblobLister::new(
             self.core.clone(),
             path.to_string(),
             args.recursive(),
             args.limit(),
         );
 
-        Ok((RpList::default(), op))
+        Ok((RpList::default(), oio::PageLister::new(l)))
     }
 
     async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
diff --git a/core/src/services/azblob/lister.rs 
b/core/src/services/azblob/lister.rs
index 25ccc1248..96e15821e 100644
--- a/core/src/services/azblob/lister.rs
+++ b/core/src/services/azblob/lister.rs
@@ -33,9 +33,6 @@ pub struct AzblobLister {
     path: String,
     delimiter: &'static str,
     limit: Option<usize>,
-
-    next_marker: String,
-    done: bool,
 }
 
 impl AzblobLister {
@@ -47,23 +44,16 @@ impl AzblobLister {
             path,
             delimiter,
             limit,
-
-            next_marker: "".to_string(),
-            done: false,
         }
     }
 }
 
 #[async_trait]
-impl oio::List for AzblobLister {
-    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        if self.done {
-            return Ok(None);
-        }
-
+impl oio::PageList for AzblobLister {
+    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
         let resp = self
             .core
-            .azblob_list_blobs(&self.path, &self.next_marker, self.delimiter, 
self.limit)
+            .azblob_list_blobs(&self.path, &ctx.token, self.delimiter, 
self.limit)
             .await?;
 
         if resp.status() != http::StatusCode::OK {
@@ -80,12 +70,11 @@ impl oio::List for AzblobLister {
         //
         // - Check `next_marker`
         if let Some(next_marker) = output.next_marker.as_ref() {
-            self.done = next_marker.is_empty();
+            ctx.done = next_marker.is_empty();
         };
-        self.next_marker = output.next_marker.clone().unwrap_or_default();
+        ctx.token = output.next_marker.clone().unwrap_or_default();
 
         let prefixes = output.blobs.blob_prefix;
-        let mut entries = Vec::with_capacity(prefixes.len() + 
output.blobs.blob.len());
 
         for prefix in prefixes {
             let de = oio::Entry::new(
@@ -93,7 +82,7 @@ impl oio::List for AzblobLister {
                 Metadata::new(EntryMode::DIR),
             );
 
-            entries.push(de)
+            ctx.entries.push_back(de)
         }
 
         for object in output.blobs.blob {
@@ -116,10 +105,10 @@ impl oio::List for AzblobLister {
 
             let de = oio::Entry::new(&build_rel_path(&self.core.root, 
&object.name), meta);
 
-            entries.push(de);
+            ctx.entries.push_back(de);
         }
 
-        Ok(Some(entries))
+        Ok(())
     }
 }
 
diff --git a/core/src/services/azdls/backend.rs 
b/core/src/services/azdls/backend.rs
index f5e08abe4..b1a7366e0 100644
--- a/core/src/services/azdls/backend.rs
+++ b/core/src/services/azdls/backend.rs
@@ -233,7 +233,7 @@ impl Accessor for AzdlsBackend {
     type BlockingReader = ();
     type Writer = AzdlsWriters;
     type BlockingWriter = ();
-    type Lister = AzdlsLister;
+    type Lister = oio::PageLister<AzdlsLister>;
     type BlockingLister = ();
 
     fn info(&self) -> AccessorInfo {
@@ -366,9 +366,9 @@ impl Accessor for AzdlsBackend {
     }
 
     async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Lister)> {
-        let op = AzdlsLister::new(self.core.clone(), path.to_string(), 
args.limit());
+        let l = AzdlsLister::new(self.core.clone(), path.to_string(), 
args.limit());
 
-        Ok((RpList::default(), op))
+        Ok((RpList::default(), oio::PageLister::new(l)))
     }
 }
 
diff --git a/core/src/services/azdls/lister.rs 
b/core/src/services/azdls/lister.rs
index a624aa31e..db7904f94 100644
--- a/core/src/services/azdls/lister.rs
+++ b/core/src/services/azdls/lister.rs
@@ -31,41 +31,29 @@ pub struct AzdlsLister {
 
     path: String,
     limit: Option<usize>,
-
-    continuation: String,
-    done: bool,
 }
 
 impl AzdlsLister {
     pub fn new(core: Arc<AzdlsCore>, path: String, limit: Option<usize>) -> 
Self {
-        Self {
-            core,
-            path,
-            limit,
-
-            continuation: "".to_string(),
-            done: false,
-        }
+        Self { core, path, limit }
     }
 }
 
 #[async_trait]
-impl oio::List for AzdlsLister {
-    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        if self.done {
-            return Ok(None);
-        }
-
+impl oio::PageList for AzdlsLister {
+    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
         let resp = self
             .core
-            .azdls_list(&self.path, &self.continuation, self.limit)
+            .azdls_list(&self.path, &ctx.token, self.limit)
             .await?;
 
-        // Azdls will return not found for not-exist path.
+        // azdls will return not found for not-exist path.
         if resp.status() == http::StatusCode::NOT_FOUND {
             resp.into_body().consume().await?;
-            return Ok(None);
+            ctx.done = true;
+            return Ok(());
         }
+
         if resp.status() != http::StatusCode::OK {
             return Err(parse_error(resp).await?);
         }
@@ -76,19 +64,15 @@ impl oio::List for AzdlsLister {
                 Error::new(ErrorKind::Unexpected, "header value is not valid 
string")
                     .set_source(err)
             })?;
-            self.continuation = value.to_string();
+            ctx.token = value.to_string();
         } else {
-            self.continuation = "".to_string();
-            self.done = true;
+            ctx.token = "".to_string();
+            ctx.done = true;
         }
 
         let bs = resp.into_body().bytes().await?;
 
-        let output: Output = de::from_slice(&bs).map_err(|e| {
-            Error::new(ErrorKind::Unexpected, "deserialize json from 
response").set_source(e)
-        })?;
-
-        let mut entries = Vec::with_capacity(output.paths.len());
+        let output: Output = 
de::from_slice(&bs).map_err(new_json_deserialize_error)?;
 
         for object in output.paths {
             // Azdls will return `"true"` and `"false"` for is_directory.
@@ -114,10 +98,10 @@ impl oio::List for AzdlsLister {
 
             let de = oio::Entry::new(&path, meta);
 
-            entries.push(de);
+            ctx.entries.push_back(de);
         }
 
-        Ok(Some(entries))
+        Ok(())
     }
 }
 
diff --git a/core/src/services/azfile/backend.rs 
b/core/src/services/azfile/backend.rs
index d11b34efd..3b87cb1f0 100644
--- a/core/src/services/azfile/backend.rs
+++ b/core/src/services/azfile/backend.rs
@@ -251,7 +251,7 @@ impl Accessor for AzfileBackend {
     type BlockingReader = ();
     type Writer = AzfileWriters;
     type BlockingWriter = ();
-    type Lister = AzfileLister;
+    type Lister = oio::PageLister<AzfileLister>;
     type BlockingLister = ();
 
     fn info(&self) -> AccessorInfo {
@@ -396,9 +396,9 @@ impl Accessor for AzfileBackend {
     }
 
     async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Lister)> {
-        let op = AzfileLister::new(self.core.clone(), path.to_string(), 
args.limit());
+        let l = AzfileLister::new(self.core.clone(), path.to_string(), 
args.limit());
 
-        Ok((RpList::default(), op))
+        Ok((RpList::default(), oio::PageLister::new(l)))
     }
 }
 
diff --git a/core/src/services/azfile/lister.rs 
b/core/src/services/azfile/lister.rs
index cd17aef33..cdd8fc980 100644
--- a/core/src/services/azfile/lister.rs
+++ b/core/src/services/azfile/lister.rs
@@ -32,39 +32,28 @@ pub struct AzfileLister {
     core: Arc<AzfileCore>,
     path: String,
     limit: Option<usize>,
-    done: bool,
-    continuation: String,
 }
 
 impl AzfileLister {
     pub fn new(core: Arc<AzfileCore>, path: String, limit: Option<usize>) -> 
Self {
-        Self {
-            core,
-            path,
-            limit,
-            done: false,
-            continuation: "".to_string(),
-        }
+        Self { core, path, limit }
     }
 }
 
 #[async_trait]
-impl oio::List for AzfileLister {
-    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        if self.done {
-            return Ok(None);
-        }
-
+impl oio::PageList for AzfileLister {
+    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
         let resp = self
             .core
-            .azfile_list(&self.path, &self.limit, &self.continuation)
+            .azfile_list(&self.path, &self.limit, &ctx.token)
             .await?;
 
         let status = resp.status();
 
         if status != StatusCode::OK {
             if status == StatusCode::NOT_FOUND {
-                return Ok(None);
+                ctx.done = true;
+                return Ok(());
             }
             return Err(parse_error(resp).await?);
         }
@@ -73,11 +62,13 @@ impl oio::List for AzfileLister {
 
         let text = String::from_utf8(bs.to_vec()).expect("response convert to 
string must success");
 
-        let results: EnumerationResults = from_str(&text).map_err(|e| {
-            Error::new(ErrorKind::Unexpected, "deserialize xml from 
response").set_source(e)
-        })?;
+        let results: EnumerationResults = 
from_str(&text).map_err(new_xml_deserialize_error)?;
 
-        let mut entries = Vec::new();
+        if results.next_marker.is_empty() {
+            ctx.done = true;
+        } else {
+            ctx.token = results.next_marker;
+        }
 
         for file in results.entries.file {
             let meta = Metadata::new(EntryMode::FILE)
@@ -85,7 +76,7 @@ impl oio::List for AzfileLister {
                 
.with_content_length(file.properties.content_length.unwrap_or(0))
                 
.with_last_modified(parse_datetime_from_rfc2822(&file.properties.last_modified)?);
             let path = self.path.clone().trim_start_matches('/').to_string() + 
&file.name;
-            entries.push(oio::Entry::new(&path, meta));
+            ctx.entries.push_back(oio::Entry::new(&path, meta));
         }
 
         for dir in results.entries.directory {
@@ -93,20 +84,10 @@ impl oio::List for AzfileLister {
                 .with_etag(dir.properties.etag)
                 
.with_last_modified(parse_datetime_from_rfc2822(&dir.properties.last_modified)?);
             let path = self.path.clone().trim_start_matches('/').to_string() + 
&dir.name + "/";
-            entries.push(oio::Entry::new(&path, meta));
+            ctx.entries.push_back(oio::Entry::new(&path, meta));
         }
 
-        if results.next_marker.is_empty() {
-            self.done = true;
-        } else {
-            self.continuation = results.next_marker;
-        }
-
-        if entries.is_empty() {
-            Ok(None)
-        } else {
-            Ok(Some(entries))
-        }
+        Ok(())
     }
 }
 
diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs
index dbf2f9b1f..cf6616c34 100644
--- a/core/src/services/cos/backend.rs
+++ b/core/src/services/cos/backend.rs
@@ -245,7 +245,7 @@ impl Accessor for CosBackend {
     type BlockingReader = ();
     type Writer = CosWriters;
     type BlockingWriter = ();
-    type Lister = CosLister;
+    type Lister = oio::PageLister<CosLister>;
     type BlockingLister = ();
 
     fn info(&self) -> AccessorInfo {
@@ -422,9 +422,7 @@ impl Accessor for CosBackend {
     }
 
     async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Lister)> {
-        Ok((
-            RpList::default(),
-            CosLister::new(self.core.clone(), path, args.recursive(), 
args.limit()),
-        ))
+        let l = CosLister::new(self.core.clone(), path, args.recursive(), 
args.limit());
+        Ok((RpList::default(), oio::PageLister::new(l)))
     }
 }
diff --git a/core/src/services/cos/lister.rs b/core/src/services/cos/lister.rs
index 55d4bd812..a6a402a0d 100644
--- a/core/src/services/cos/lister.rs
+++ b/core/src/services/cos/lister.rs
@@ -36,9 +36,6 @@ pub struct CosLister {
     path: String,
     delimiter: &'static str,
     limit: Option<usize>,
-
-    next_marker: String,
-    done: bool,
 }
 
 impl CosLister {
@@ -49,23 +46,16 @@ impl CosLister {
             path: path.to_string(),
             delimiter,
             limit,
-
-            next_marker: "".to_string(),
-            done: false,
         }
     }
 }
 
 #[async_trait]
-impl oio::List for CosLister {
-    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        if self.done {
-            return Ok(None);
-        }
-
+impl oio::PageList for CosLister {
+    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
         let resp = self
             .core
-            .cos_list_objects(&self.path, &self.next_marker, self.delimiter, 
self.limit)
+            .cos_list_objects(&self.path, &ctx.token, self.delimiter, 
self.limit)
             .await?;
 
         if resp.status() != http::StatusCode::OK {
@@ -80,22 +70,19 @@ impl oio::List for CosLister {
         // Try our best to check whether this list is done.
         //
         // - Check `next_marker`
-        self.done = match output.next_marker.as_ref() {
+        ctx.done = match output.next_marker.as_ref() {
             None => true,
             Some(next_marker) => next_marker.is_empty(),
         };
-        self.next_marker = output.next_marker.clone().unwrap_or_default();
-
-        let common_prefixes = output.common_prefixes;
-        let mut entries = Vec::with_capacity(common_prefixes.len() + 
output.contents.len());
+        ctx.token = output.next_marker.clone().unwrap_or_default();
 
-        for prefix in common_prefixes {
+        for prefix in output.common_prefixes {
             let de = oio::Entry::new(
                 &build_rel_path(&self.core.root, &prefix.prefix),
                 Metadata::new(EntryMode::DIR),
             );
 
-            entries.push(de);
+            ctx.entries.push_back(de);
         }
 
         for object in output.contents {
@@ -107,10 +94,10 @@ impl oio::List for CosLister {
 
             let de = oio::Entry::new(&build_rel_path(&self.core.root, 
&object.key), meta);
 
-            entries.push(de);
+            ctx.entries.push_back(de);
         }
 
-        Ok(Some(entries))
+        Ok(())
     }
 }
 
diff --git a/core/src/services/dbfs/backend.rs 
b/core/src/services/dbfs/backend.rs
index 4b13e10ba..8da90e2b7 100644
--- a/core/src/services/dbfs/backend.rs
+++ b/core/src/services/dbfs/backend.rs
@@ -158,7 +158,7 @@ impl Accessor for DbfsBackend {
     type BlockingReader = ();
     type Writer = oio::OneShotWriter<DbfsWriter>;
     type BlockingWriter = ();
-    type Lister = DbfsLister;
+    type Lister = oio::PageLister<DbfsLister>;
     type BlockingLister = ();
 
     fn info(&self) -> AccessorInfo {
@@ -276,9 +276,9 @@ impl Accessor for DbfsBackend {
     }
 
     async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, 
Self::Lister)> {
-        let op = DbfsLister::new(self.core.clone(), path.to_string());
+        let l = DbfsLister::new(self.core.clone(), path.to_string());
 
-        Ok((RpList::default(), op))
+        Ok((RpList::default(), oio::PageLister::new(l)))
     }
 }
 
diff --git a/core/src/services/dbfs/lister.rs b/core/src/services/dbfs/lister.rs
index fd27b912b..093b4c478 100644
--- a/core/src/services/dbfs/lister.rs
+++ b/core/src/services/dbfs/lister.rs
@@ -30,32 +30,24 @@ use super::error::parse_error;
 pub struct DbfsLister {
     core: Arc<DbfsCore>,
     path: String,
-    done: bool,
 }
 
 impl DbfsLister {
     pub fn new(core: Arc<DbfsCore>, path: String) -> Self {
-        Self {
-            core,
-            path,
-            done: false,
-        }
+        Self { core, path }
     }
 }
 
 #[async_trait]
-impl oio::List for DbfsLister {
-    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        if self.done {
-            return Ok(None);
-        }
-
+impl oio::PageList for DbfsLister {
+    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
         let response = self.core.dbfs_list(&self.path).await?;
 
         let status_code = response.status();
         if !status_code.is_success() {
             if status_code == StatusCode::NOT_FOUND {
-                return Ok(None);
+                ctx.done = true;
+                return Ok(());
             }
             let error = parse_error(response).await?;
             return Err(error);
@@ -65,11 +57,9 @@ impl oio::List for DbfsLister {
         let mut decoded_response =
             
serde_json::from_slice::<DbfsOutputList>(&bytes).map_err(new_json_deserialize_error)?;
 
-        self.done = true;
-
-        let mut entries = Vec::with_capacity(decoded_response.files.len());
+        ctx.done = true;
 
-        while let Some(status) = decoded_response.files.pop() {
+        for status in decoded_response.files {
             let entry: oio::Entry = match status.is_dir {
                 true => {
                     let normalized_path = format!("{}/", &status.path);
@@ -88,9 +78,9 @@ impl oio::List for DbfsLister {
                     oio::Entry::new(&status.path, meta)
                 }
             };
-            entries.push(entry);
+            ctx.entries.push_back(entry);
         }
-        Ok(Some(entries))
+        Ok(())
     }
 }
 
diff --git a/core/src/services/fs/lister.rs b/core/src/services/fs/lister.rs
index 403f674e8..13108b691 100644
--- a/core/src/services/fs/lister.rs
+++ b/core/src/services/fs/lister.rs
@@ -15,10 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::fs::FileType;
 use std::path::Path;
 use std::path::PathBuf;
+use std::task::{ready, Context, Poll};
 
 use async_trait::async_trait;
+use futures::future::BoxFuture;
+use futures::FutureExt;
 
 use crate::raw::*;
 use crate::EntryMode;
@@ -30,6 +34,8 @@ pub struct FsLister<P> {
 
     size: usize,
     rd: P,
+
+    fut: Option<BoxFuture<'static, (tokio::fs::DirEntry, Result<FileType>)>>,
 }
 
 impl<P> FsLister<P> {
@@ -38,19 +44,35 @@ impl<P> FsLister<P> {
             root: root.to_owned(),
             size: limit.unwrap_or(1000),
             rd,
+
+            fut: None,
         }
     }
 }
 
+/// # Safety
+///
+/// We will only take `&mut Self` reference for FsLister.
+unsafe impl<P> Sync for FsLister<P> {}
+
 #[async_trait]
 impl oio::List for FsLister<tokio::fs::ReadDir> {
-    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        let mut oes: Vec<oio::Entry> = Vec::with_capacity(self.size);
-
-        for _ in 0..self.size {
-            let de = match 
self.rd.next_entry().await.map_err(new_std_io_error)? {
-                Some(de) => de,
-                None => break,
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Result<Option<oio::Entry>>> {
+        if let Some(fut) = self.fut.as_mut() {
+            let (de, ft) = futures::ready!(fut.poll_unpin(cx));
+            let ft = match ft {
+                Ok(ft) => {
+                    self.fut = None;
+                    ft
+                }
+                Err(e) => {
+                    let fut = async move {
+                        let ft = 
de.file_type().await.map_err(new_std_io_error);
+                        (de, ft)
+                    };
+                    self.fut = Some(Box::pin(fut));
+                    return Poll::Ready(Err(e));
+                }
             };
 
             let entry_path = de.path();
@@ -62,25 +84,30 @@ impl oio::List for FsLister<tokio::fs::ReadDir> {
                     .replace('\\', "/"),
             );
 
-            // On Windows and most Unix platforms this function is free
-            // (no extra system calls needed), but some Unix platforms may
-            // require the equivalent call to symlink_metadata to learn about
-            // the target file type.
-            let file_type = de.file_type().await.map_err(new_std_io_error)?;
-
-            let d = if file_type.is_file() {
+            let d = if ft.is_file() {
                 oio::Entry::new(&rel_path, Metadata::new(EntryMode::FILE))
-            } else if file_type.is_dir() {
+            } else if ft.is_dir() {
                 // Make sure we are returning the correct path.
                 oio::Entry::new(&format!("{rel_path}/"), 
Metadata::new(EntryMode::DIR))
             } else {
                 oio::Entry::new(&rel_path, Metadata::new(EntryMode::Unknown))
             };
 
-            oes.push(d)
+            return Poll::Ready(Ok(Some(d)));
         }
 
-        Ok(if oes.is_empty() { None } else { Some(oes) })
+        let de = 
ready!(self.rd.poll_next_entry(cx)).map_err(new_std_io_error)?;
+        match de {
+            Some(de) => {
+                let fut = async move {
+                    let ft = de.file_type().await.map_err(new_std_io_error);
+                    (de, ft)
+                };
+                self.fut = Some(Box::pin(fut));
+                self.poll_next(cx)
+            }
+            None => Poll::Ready(Ok(None)),
+        }
     }
 }
 
diff --git a/core/src/services/ftp/lister.rs b/core/src/services/ftp/lister.rs
index ba0892e8c..354bd4379 100644
--- a/core/src/services/ftp/lister.rs
+++ b/core/src/services/ftp/lister.rs
@@ -17,6 +17,7 @@
 
 use std::str;
 use std::str::FromStr;
+use std::task::{Context, Poll};
 use std::vec::IntoIter;
 
 use async_trait::async_trait;
@@ -43,35 +44,29 @@ impl FtpLister {
 
 #[async_trait]
 impl oio::List for FtpLister {
-    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        let mut oes: Vec<oio::Entry> = Vec::with_capacity(self.size);
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Result<Option<oio::Entry>>> {
+        let de = match self.file_iter.next() {
+            Some(file_str) => File::from_str(file_str.as_str()).map_err(|e| {
+                Error::new(ErrorKind::Unexpected, "parse file from 
response").set_source(e)
+            })?,
+            None => return Poll::Ready(Ok(None)),
+        };
 
-        for _ in 0..self.size {
-            let de = match self.file_iter.next() {
-                Some(file_str) => 
File::from_str(file_str.as_str()).map_err(|e| {
-                    Error::new(ErrorKind::Unexpected, "parse file from 
response").set_source(e)
-                })?,
-                None => break,
-            };
+        let path = self.path.to_string() + de.name();
 
-            let path = self.path.to_string() + de.name();
+        let entry = if de.is_file() {
+            oio::Entry::new(
+                &path,
+                Metadata::new(EntryMode::FILE)
+                    .with_content_length(de.size() as u64)
+                    .with_last_modified(de.modified().into()),
+            )
+        } else if de.is_directory() {
+            oio::Entry::new(&format!("{}/", &path), 
Metadata::new(EntryMode::DIR))
+        } else {
+            oio::Entry::new(&path, Metadata::new(EntryMode::Unknown))
+        };
 
-            let d = if de.is_file() {
-                oio::Entry::new(
-                    &path,
-                    Metadata::new(EntryMode::FILE)
-                        .with_content_length(de.size() as u64)
-                        .with_last_modified(de.modified().into()),
-                )
-            } else if de.is_directory() {
-                oio::Entry::new(&format!("{}/", &path), 
Metadata::new(EntryMode::DIR))
-            } else {
-                oio::Entry::new(&path, Metadata::new(EntryMode::Unknown))
-            };
-
-            oes.push(d)
-        }
-
-        Ok(if oes.is_empty() { None } else { Some(oes) })
+        Poll::Ready(Ok(Some(entry)))
     }
 }
diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index c2340b7c5..d528479dc 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -312,7 +312,7 @@ impl Accessor for GcsBackend {
     type BlockingReader = ();
     type Writer = GcsWriters;
     type BlockingWriter = ();
-    type Lister = GcsLister;
+    type Lister = oio::PageLister<GcsLister>;
     type BlockingLister = ();
 
     fn info(&self) -> AccessorInfo {
@@ -472,16 +472,15 @@ impl Accessor for GcsBackend {
     }
 
     async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Lister)> {
-        Ok((
-            RpList::default(),
-            GcsLister::new(
-                self.core.clone(),
-                path,
-                args.recursive(),
-                args.limit(),
-                args.start_after(),
-            ),
-        ))
+        let l = GcsLister::new(
+            self.core.clone(),
+            path,
+            args.recursive(),
+            args.limit(),
+            args.start_after(),
+        );
+
+        Ok((RpList::default(), oio::PageLister::new(l)))
     }
 
     async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
diff --git a/core/src/services/gcs/lister.rs b/core/src/services/gcs/lister.rs
index 97496f353..244ca2a93 100644
--- a/core/src/services/gcs/lister.rs
+++ b/core/src/services/gcs/lister.rs
@@ -38,9 +38,6 @@ pub struct GcsLister {
     /// Filter results to objects whose names are lexicographically
     /// **equal to or after** startOffset
     start_after: Option<String>,
-
-    page_token: String,
-    done: bool,
 }
 
 impl GcsLister {
@@ -60,28 +57,25 @@ impl GcsLister {
             delimiter,
             limit,
             start_after: start_after.map(String::from),
-
-            page_token: "".to_string(),
-            done: false,
         }
     }
 }
 
 #[async_trait]
-impl oio::List for GcsLister {
-    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        if self.done {
-            return Ok(None);
-        }
-
+impl oio::PageList for GcsLister {
+    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
         let resp = self
             .core
             .gcs_list_objects(
                 &self.path,
-                &self.page_token,
+                &ctx.token,
                 self.delimiter,
                 self.limit,
-                self.start_after.clone(),
+                if ctx.token.is_empty() {
+                    self.start_after.clone()
+                } else {
+                    None
+                },
             )
             .await?;
 
@@ -94,20 +88,18 @@ impl oio::List for GcsLister {
             
serde_json::from_slice(&bytes).map_err(new_json_deserialize_error)?;
 
         if let Some(token) = &output.next_page_token {
-            self.page_token = token.clone();
+            ctx.token = token.clone();
         } else {
-            self.done = true;
+            ctx.done = true;
         }
 
-        let mut entries = Vec::with_capacity(output.prefixes.len() + 
output.items.len());
-
         for prefix in output.prefixes {
             let de = oio::Entry::new(
                 &build_rel_path(&self.core.root, &prefix),
                 Metadata::new(EntryMode::DIR),
             );
 
-            entries.push(de);
+            ctx.entries.push_back(de);
         }
 
         for object in output.items {
@@ -139,10 +131,10 @@ impl oio::List for GcsLister {
 
             let de = oio::Entry::new(path, meta);
 
-            entries.push(de);
+            ctx.entries.push_back(de);
         }
 
-        Ok(Some(entries))
+        Ok(())
     }
 }
 
diff --git a/core/src/services/gdrive/backend.rs 
b/core/src/services/gdrive/backend.rs
index f0d9da1b1..8f82ede86 100644
--- a/core/src/services/gdrive/backend.rs
+++ b/core/src/services/gdrive/backend.rs
@@ -45,7 +45,7 @@ impl Accessor for GdriveBackend {
     type BlockingReader = ();
     type Writer = oio::OneShotWriter<GdriveWriter>;
     type BlockingWriter = ();
-    type Lister = GdriveLister;
+    type Lister = oio::PageLister<GdriveLister>;
     type BlockingLister = ();
 
     fn info(&self) -> AccessorInfo {
@@ -213,10 +213,8 @@ impl Accessor for GdriveBackend {
     }
 
     async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, 
Self::Lister)> {
-        Ok((
-            RpList::default(),
-            GdriveLister::new(path.into(), self.core.clone()),
-        ))
+        let l = GdriveLister::new(path.into(), self.core.clone());
+        Ok((RpList::default(), oio::PageLister::new(l)))
     }
 
     async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> 
Result<RpCopy> {
diff --git a/core/src/services/gdrive/core.rs b/core/src/services/gdrive/core.rs
index 3239b281b..f6517419d 100644
--- a/core/src/services/gdrive/core.rs
+++ b/core/src/services/gdrive/core.rs
@@ -310,7 +310,7 @@ impl GdriveCore {
         &self,
         path: &str,
         page_size: i32,
-        next_page_token: Option<String>,
+        next_page_token: &str,
     ) -> Result<Response<IncomingAsyncBody>> {
         let file_id = self.get_file_id_by_path(path).await;
 
@@ -341,22 +341,13 @@ impl GdriveCore {
             },
         };
 
-        let url = match next_page_token {
-            Some(page_token) => {
-                format!(
-                    
"https://www.googleapis.com/drive/v3/files?pageSize={}&pageToken={}&q={}";,
-                    page_size,
-                    page_token,
-                    percent_encode_path(q.as_str())
-                )
-            }
-            None => {
-                format!(
-                    
"https://www.googleapis.com/drive/v3/files?pageSize={}&q={}";,
-                    page_size,
-                    percent_encode_path(q.as_str())
-                )
-            }
+        let mut url = format!(
+            "https://www.googleapis.com/drive/v3/files?pageSize={}&q={}";,
+            page_size,
+            percent_encode_path(&q)
+        );
+        if !next_page_token.is_empty() {
+            url += &format!("&pageToken={next_page_token}");
         };
 
         let mut req = Request::get(&url)
diff --git a/core/src/services/gdrive/lister.rs 
b/core/src/services/gdrive/lister.rs
index 53d8f86c4..053a0fabb 100644
--- a/core/src/services/gdrive/lister.rs
+++ b/core/src/services/gdrive/lister.rs
@@ -23,80 +23,56 @@ use http::StatusCode;
 use super::core::GdriveCore;
 use super::core::GdriveFileList;
 use super::error::parse_error;
-use crate::raw::build_rel_path;
-use crate::raw::build_rooted_abs_path;
-use crate::raw::new_json_deserialize_error;
-use crate::raw::oio::{self};
-use crate::EntryMode;
-use crate::Metadata;
-use crate::Result;
+use crate::raw::*;
+use crate::*;
+
 pub struct GdriveLister {
     path: String,
     core: Arc<GdriveCore>,
-    next_page_token: Option<String>,
-    done: bool,
 }
 
 impl GdriveLister {
     pub fn new(path: String, core: Arc<GdriveCore>) -> Self {
-        Self {
-            path,
-            core,
-            next_page_token: None,
-            done: false,
-        }
+        Self { path, core }
     }
 }
 
 #[async_trait]
-impl oio::List for GdriveLister {
-    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        if self.done {
-            return Ok(None);
-        }
-
-        let resp = self
-            .core
-            .gdrive_list(&self.path, 100, self.next_page_token.clone())
-            .await?;
+impl oio::PageList for GdriveLister {
+    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
+        let resp = self.core.gdrive_list(&self.path, 100, &ctx.token).await?;
 
         let bytes = match resp.status() {
             StatusCode::OK => resp.into_body().bytes().await?,
             _ => return Err(parse_error(resp).await?),
         };
 
-        if bytes.is_empty() {
-            return Ok(None);
-        }
-
         let decoded_response =
             
serde_json::from_slice::<GdriveFileList>(&bytes).map_err(new_json_deserialize_error)?;
 
         if let Some(next_page_token) = decoded_response.next_page_token {
-            self.next_page_token = Some(next_page_token);
+            ctx.token = next_page_token;
         } else {
-            self.done = true;
+            ctx.done = true;
         }
 
-        let entries: Vec<oio::Entry> = decoded_response
-            .files
-            .into_iter()
-            .map(|mut file| {
-                let file_type = if file.mime_type.as_str() == 
"application/vnd.google-apps.folder" {
-                    file.name = format!("{}/", file.name);
-                    EntryMode::DIR
-                } else {
-                    EntryMode::FILE
-                };
+        for mut file in decoded_response.files {
+            let file_type = if file.mime_type.as_str() == 
"application/vnd.google-apps.folder" {
+                file.name = format!("{}/", file.name);
+                EntryMode::DIR
+            } else {
+                EntryMode::FILE
+            };
 
-                let root = &self.core.root;
-                let path = format!("{}{}", build_rooted_abs_path(root, 
&self.path), file.name);
-                let normalized_path = build_rel_path(root, &path);
+            let root = &self.core.root;
+            let path = format!("{}{}", build_rooted_abs_path(root, 
&self.path), file.name);
+            let normalized_path = build_rel_path(root, &path);
 
-                oio::Entry::new(&normalized_path, Metadata::new(file_type))
-            })
-            .collect();
+            let entry = oio::Entry::new(&normalized_path, 
Metadata::new(file_type));
+
+            ctx.entries.push_back(entry);
+        }
 
-        Ok(Some(entries))
+        Ok(())
     }
 }
diff --git a/core/src/services/hdfs/lister.rs b/core/src/services/hdfs/lister.rs
index 5e31c1e14..ea8a57572 100644
--- a/core/src/services/hdfs/lister.rs
+++ b/core/src/services/hdfs/lister.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use async_trait::async_trait;
+use std::task::{Context, Poll};
 
 use crate::raw::*;
 use crate::EntryMode;
@@ -42,33 +43,27 @@ impl HdfsLister {
 
 #[async_trait]
 impl oio::List for HdfsLister {
-    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        let mut oes: Vec<oio::Entry> = Vec::with_capacity(self.size);
-
-        for _ in 0..self.size {
-            let de = match self.rd.next() {
-                Some(de) => de,
-                None => break,
-            };
-
-            let path = build_rel_path(&self.root, de.path());
-
-            let d = if de.is_file() {
-                let meta = Metadata::new(EntryMode::FILE)
-                    .with_content_length(de.len())
-                    .with_last_modified(de.modified().into());
-                oio::Entry::new(&path, meta)
-            } else if de.is_dir() {
-                // Make sure we are returning the correct path.
-                oio::Entry::new(&format!("{path}/"), 
Metadata::new(EntryMode::DIR))
-            } else {
-                oio::Entry::new(&path, Metadata::new(EntryMode::Unknown))
-            };
-
-            oes.push(d)
-        }
-
-        Ok(if oes.is_empty() { None } else { Some(oes) })
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Result<Option<oio::Entry>>> {
+        let de = match self.rd.next() {
+            Some(de) => de,
+            None => return Poll::Ready(Ok(None)),
+        };
+
+        let path = build_rel_path(&self.root, de.path());
+
+        let entry = if de.is_file() {
+            let meta = Metadata::new(EntryMode::FILE)
+                .with_content_length(de.len())
+                .with_last_modified(de.modified().into());
+            oio::Entry::new(&path, meta)
+        } else if de.is_dir() {
+            // Make sure we are returning the correct path.
+            oio::Entry::new(&format!("{path}/"), Metadata::new(EntryMode::DIR))
+        } else {
+            oio::Entry::new(&path, Metadata::new(EntryMode::Unknown))
+        };
+
+        Poll::Ready(Ok(Some(entry)))
     }
 }
 
diff --git a/core/src/services/ipfs/backend.rs 
b/core/src/services/ipfs/backend.rs
index 2c1681dbe..915e2b798 100644
--- a/core/src/services/ipfs/backend.rs
+++ b/core/src/services/ipfs/backend.rs
@@ -165,7 +165,7 @@ impl Accessor for IpfsBackend {
     type BlockingReader = ();
     type Writer = ();
     type BlockingWriter = ();
-    type Lister = DirStream;
+    type Lister = oio::PageLister<DirStream>;
     type BlockingLister = ();
 
     fn info(&self) -> AccessorInfo {
@@ -351,10 +351,8 @@ impl Accessor for IpfsBackend {
     }
 
     async fn list(&self, path: &str, _: OpList) -> Result<(RpList, 
Self::Lister)> {
-        Ok((
-            RpList::default(),
-            DirStream::new(Arc::new(self.clone()), path),
-        ))
+        let l = DirStream::new(Arc::new(self.clone()), path);
+        Ok((RpList::default(), oio::PageLister::new(l)))
     }
 }
 
@@ -415,7 +413,6 @@ impl IpfsBackend {
 pub struct DirStream {
     backend: Arc<IpfsBackend>,
     path: String,
-    consumed: bool,
 }
 
 impl DirStream {
@@ -423,18 +420,13 @@ impl DirStream {
         Self {
             backend,
             path: path.to_string(),
-            consumed: false,
         }
     }
 }
 
 #[async_trait]
-impl oio::List for DirStream {
-    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        if self.consumed {
-            return Ok(None);
-        }
-
+impl oio::PageList for DirStream {
+    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
         let resp = self.backend.ipfs_list(&self.path).await?;
 
         if resp.status() != StatusCode::OK {
@@ -452,8 +444,6 @@ impl oio::List for DirStream {
             .map(|v| v.name.unwrap())
             .collect::<Vec<String>>();
 
-        let mut oes = Vec::with_capacity(names.len());
-
         for mut name in names {
             let meta = self
                 .backend
@@ -465,10 +455,10 @@ impl oio::List for DirStream {
                 name += "/";
             }
 
-            oes.push(oio::Entry::new(&name, meta))
+            ctx.entries.push_back(oio::Entry::new(&name, meta))
         }
 
-        self.consumed = true;
-        Ok(Some(oes))
+        ctx.done = true;
+        Ok(())
     }
 }
diff --git a/core/src/services/ipmfs/backend.rs 
b/core/src/services/ipmfs/backend.rs
index 26c06d569..7a9da9b4e 100644
--- a/core/src/services/ipmfs/backend.rs
+++ b/core/src/services/ipmfs/backend.rs
@@ -66,7 +66,7 @@ impl Accessor for IpmfsBackend {
     type BlockingReader = ();
     type Writer = oio::OneShotWriter<IpmfsWriter>;
     type BlockingWriter = ();
-    type Lister = IpmfsLister;
+    type Lister = oio::PageLister<IpmfsLister>;
     type BlockingLister = ();
 
     fn info(&self) -> AccessorInfo {
@@ -171,10 +171,8 @@ impl Accessor for IpmfsBackend {
     }
 
     async fn list(&self, path: &str, _: OpList) -> Result<(RpList, 
Self::Lister)> {
-        Ok((
-            RpList::default(),
-            IpmfsLister::new(Arc::new(self.clone()), &self.root, path),
-        ))
+        let l = IpmfsLister::new(Arc::new(self.clone()), &self.root, path);
+        Ok((RpList::default(), oio::PageLister::new(l)))
     }
 }
 
diff --git a/core/src/services/ipmfs/lister.rs 
b/core/src/services/ipmfs/lister.rs
index 535c982b1..902d32592 100644
--- a/core/src/services/ipmfs/lister.rs
+++ b/core/src/services/ipmfs/lister.rs
@@ -47,12 +47,8 @@ impl IpmfsLister {
 }
 
 #[async_trait]
-impl oio::List for IpmfsLister {
-    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        if self.consumed {
-            return Ok(None);
-        }
-
+impl oio::PageList for IpmfsLister {
+    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
         let resp = self.backend.ipmfs_ls(&self.path).await?;
 
         if resp.status() != StatusCode::OK {
@@ -64,33 +60,24 @@ impl oio::List for IpmfsLister {
             serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?;
 
         // Mark dir stream has been consumed.
-        self.consumed = true;
-
-        Ok(Some(
-            entries_body
-                .entries
-                .unwrap_or_default()
-                .into_iter()
-                .map(|object| {
-                    let path = match object.mode() {
-                        EntryMode::FILE => {
-                            format!("{}{}", &self.path, object.name)
-                        }
-                        EntryMode::DIR => {
-                            format!("{}{}/", &self.path, object.name)
-                        }
-                        EntryMode::Unknown => unreachable!(),
-                    };
-
-                    let path = build_rel_path(&self.root, &path);
-
-                    oio::Entry::new(
-                        &path,
-                        
Metadata::new(object.mode()).with_content_length(object.size),
-                    )
-                })
-                .collect(),
-        ))
+        ctx.done = true;
+
+        for object in entries_body.entries.unwrap_or_default() {
+            let path = match object.mode() {
+                EntryMode::FILE => format!("{}{}", &self.path, object.name),
+                EntryMode::DIR => format!("{}{}/", &self.path, object.name),
+                EntryMode::Unknown => unreachable!(),
+            };
+
+            let path = build_rel_path(&self.root, &path);
+
+            ctx.entries.push_back(oio::Entry::new(
+                &path,
+                Metadata::new(object.mode()).with_content_length(object.size),
+            ));
+        }
+
+        Ok(())
     }
 }
 
diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs
index 6cfe43e4f..4f84852b5 100644
--- a/core/src/services/obs/backend.rs
+++ b/core/src/services/obs/backend.rs
@@ -252,7 +252,7 @@ impl Accessor for ObsBackend {
     type BlockingReader = ();
     type Writer = ObsWriters;
     type BlockingWriter = ();
-    type Lister = ObsLister;
+    type Lister = oio::PageLister<ObsLister>;
     type BlockingLister = ();
 
     fn info(&self) -> AccessorInfo {
@@ -428,9 +428,7 @@ impl Accessor for ObsBackend {
     }
 
     async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Lister)> {
-        Ok((
-            RpList::default(),
-            ObsLister::new(self.core.clone(), path, args.recursive(), 
args.limit()),
-        ))
+        let l = ObsLister::new(self.core.clone(), path, args.recursive(), 
args.limit());
+        Ok((RpList::default(), oio::PageLister::new(l)))
     }
 }
diff --git a/core/src/services/obs/lister.rs b/core/src/services/obs/lister.rs
index c96f51e3e..831427ab1 100644
--- a/core/src/services/obs/lister.rs
+++ b/core/src/services/obs/lister.rs
@@ -36,9 +36,6 @@ pub struct ObsLister {
     path: String,
     delimiter: &'static str,
     limit: Option<usize>,
-
-    next_marker: String,
-    done: bool,
 }
 
 impl ObsLister {
@@ -50,23 +47,16 @@ impl ObsLister {
             path: path.to_string(),
             delimiter,
             limit,
-
-            next_marker: "".to_string(),
-            done: false,
         }
     }
 }
 
 #[async_trait]
-impl oio::List for ObsLister {
-    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        if self.done {
-            return Ok(None);
-        }
-
+impl oio::PageList for ObsLister {
+    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
         let resp = self
             .core
-            .obs_list_objects(&self.path, &self.next_marker, self.delimiter, 
self.limit)
+            .obs_list_objects(&self.path, &ctx.token, self.delimiter, 
self.limit)
             .await?;
 
         if resp.status() != http::StatusCode::OK {
@@ -81,14 +71,13 @@ impl oio::List for ObsLister {
         // Try our best to check whether this list is done.
         //
         // - Check `next_marker`
-        self.done = match output.next_marker.as_ref() {
+        ctx.done = match output.next_marker.as_ref() {
             None => true,
             Some(next_marker) => next_marker.is_empty(),
         };
-        self.next_marker = output.next_marker.clone().unwrap_or_default();
+        ctx.token = output.next_marker.clone().unwrap_or_default();
 
         let common_prefixes = output.common_prefixes;
-        let mut entries = Vec::with_capacity(common_prefixes.len() + 
output.contents.len());
 
         for prefix in common_prefixes {
             let de = oio::Entry::new(
@@ -96,7 +85,7 @@ impl oio::List for ObsLister {
                 Metadata::new(EntryMode::DIR),
             );
 
-            entries.push(de);
+            ctx.entries.push_back(de);
         }
 
         for object in output.contents {
@@ -108,10 +97,10 @@ impl oio::List for ObsLister {
 
             let de = oio::Entry::new(&build_rel_path(&self.core.root, 
&object.key), meta);
 
-            entries.push(de);
+            ctx.entries.push_back(de);
         }
 
-        Ok(Some(entries))
+        Ok(())
     }
 }
 
diff --git a/core/src/services/onedrive/backend.rs 
b/core/src/services/onedrive/backend.rs
index bef175735..ba4e5b8d3 100644
--- a/core/src/services/onedrive/backend.rs
+++ b/core/src/services/onedrive/backend.rs
@@ -66,7 +66,7 @@ impl Accessor for OnedriveBackend {
     type BlockingReader = ();
     type Writer = oio::OneShotWriter<OneDriveWriter>;
     type BlockingWriter = ();
-    type Lister = OnedriveLister;
+    type Lister = oio::PageLister<OnedriveLister>;
     type BlockingLister = ();
 
     fn info(&self) -> AccessorInfo {
@@ -163,10 +163,9 @@ impl Accessor for OnedriveBackend {
     }
 
     async fn list(&self, path: &str, _op_list: OpList) -> Result<(RpList, 
Self::Lister)> {
-        let lister: OnedriveLister =
-            OnedriveLister::new(self.root.clone(), path.into(), self.clone());
+        let l = OnedriveLister::new(self.root.clone(), path.into(), 
self.clone());
 
-        Ok((RpList::default(), lister))
+        Ok((RpList::default(), oio::PageLister::new(l)))
     }
 
     async fn create_dir(&self, path: &str, _: OpCreateDir) -> 
Result<RpCreateDir> {
diff --git a/core/src/services/onedrive/lister.rs 
b/core/src/services/onedrive/lister.rs
index 90daeb866..12c557867 100644
--- a/core/src/services/onedrive/lister.rs
+++ b/core/src/services/onedrive/lister.rs
@@ -16,28 +16,19 @@
 // under the License.
 
 use async_trait::async_trait;
-use http::Response;
 
 use super::backend::OnedriveBackend;
 use super::error::parse_error;
 use super::graph_model::GraphApiOnedriveListResponse;
 use super::graph_model::ItemType;
-use crate::raw::build_rel_path;
-use crate::raw::build_rooted_abs_path;
-use crate::raw::new_json_deserialize_error;
-use crate::raw::oio::{self};
-use crate::raw::percent_encode_path;
-use crate::raw::IncomingAsyncBody;
-use crate::EntryMode;
-use crate::Metadata;
-use crate::Result;
+use crate::raw::oio;
+use crate::raw::*;
+use crate::*;
 
 pub struct OnedriveLister {
     root: String,
     path: String,
     backend: OnedriveBackend,
-    next_link: Option<String>,
-    done: bool,
 }
 
 impl OnedriveLister {
@@ -48,92 +39,80 @@ impl OnedriveLister {
             root,
             path,
             backend,
-            next_link: None,
-            done: false,
         }
     }
 }
 
 #[async_trait]
-impl oio::List for OnedriveLister {
-    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        if self.done {
-            return Ok(None);
-        }
-        let response = self.onedrive_get().await?;
+impl oio::PageList for OnedriveLister {
+    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
+        let request_url = if ctx.token.is_empty() {
+            let path = build_rooted_abs_path(&self.root, &self.path);
+            let url: String = if path == "." || path == "/" {
+                
"https://graph.microsoft.com/v1.0/me/drive/root/children".to_string()
+            } else {
+                // According to OneDrive API examples, the path should not end 
with a slash.
+                // Reference: 
<https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_list_children?view=odsp-graph-online>
+                let path = path.strip_suffix('/').unwrap_or("");
+                format!(
+                    
"https://graph.microsoft.com/v1.0/me/drive/root:{}:/children";,
+                    percent_encode_path(path),
+                )
+            };
+            url
+        } else {
+            ctx.token.clone()
+        };
+
+        let resp = self
+            .backend
+            .onedrive_get_next_list_page(&request_url)
+            .await?;
 
-        let status_code = response.status();
+        let status_code = resp.status();
         if !status_code.is_success() {
             if status_code == http::StatusCode::NOT_FOUND {
-                return Ok(None);
+                ctx.done = true;
+                return Ok(());
             }
-            let error = parse_error(response).await?;
+            let error = parse_error(resp).await?;
             return Err(error);
         }
 
-        let bytes = response.into_body().bytes().await?;
+        let bytes = resp.into_body().bytes().await?;
         let decoded_response = 
serde_json::from_slice::<GraphApiOnedriveListResponse>(&bytes)
             .map_err(new_json_deserialize_error)?;
 
         if let Some(next_link) = decoded_response.next_link {
-            self.next_link = Some(next_link);
+            ctx.token = next_link;
         } else {
-            self.done = true;
+            ctx.done = true;
         }
 
-        let entries: Vec<oio::Entry> = decoded_response
-            .value
-            .into_iter()
-            .map(|drive_item| {
-                let name = drive_item.name;
-                let parent_path = drive_item.parent_reference.path;
-                let parent_path = parent_path
-                    .strip_prefix(Self::DRIVE_ROOT_PREFIX)
-                    .unwrap_or("");
-
-                let path = format!("{}/{}", parent_path, name);
-
-                let normalized_path = build_rel_path(&self.root, &path);
-
-                let entry: oio::Entry = match drive_item.item_type {
-                    ItemType::Folder { .. } => {
-                        let normalized_path = format!("{}/", normalized_path);
-                        oio::Entry::new(&normalized_path, 
Metadata::new(EntryMode::DIR))
-                    }
-                    ItemType::File { .. } => {
-                        oio::Entry::new(&normalized_path, 
Metadata::new(EntryMode::FILE))
-                    }
-                };
-                entry
-            })
-            .collect();
-
-        Ok(Some(entries))
-    }
-}
-
-impl OnedriveLister {
-    async fn onedrive_get(&mut self) -> Result<Response<IncomingAsyncBody>> {
-        let request_url = if let Some(next_link) = &self.next_link {
-            let next_link_clone = next_link.clone();
-            self.next_link = None;
-            next_link_clone
-        } else {
-            let path = build_rooted_abs_path(&self.root, &self.path);
-            let url: String = if path == "." || path == "/" {
-                
"https://graph.microsoft.com/v1.0/me/drive/root/children".to_string()
-            } else {
-                // According to OneDrive API examples, the path should not end 
with a slash.
-                // Reference: 
<https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_list_children?view=odsp-graph-online>
-                let path = path.strip_suffix('/').unwrap_or("");
-                format!(
-                    
"https://graph.microsoft.com/v1.0/me/drive/root:{}:/children";,
-                    percent_encode_path(path),
-                )
+        for drive_item in decoded_response.value {
+            let name = drive_item.name;
+            let parent_path = drive_item.parent_reference.path;
+            let parent_path = parent_path
+                .strip_prefix(Self::DRIVE_ROOT_PREFIX)
+                .unwrap_or("");
+
+            let path = format!("{}/{}", parent_path, name);
+
+            let normalized_path = build_rel_path(&self.root, &path);
+
+            let entry: oio::Entry = match drive_item.item_type {
+                ItemType::Folder { .. } => {
+                    let normalized_path = format!("{}/", normalized_path);
+                    oio::Entry::new(&normalized_path, 
Metadata::new(EntryMode::DIR))
+                }
+                ItemType::File { .. } => {
+                    oio::Entry::new(&normalized_path, 
Metadata::new(EntryMode::FILE))
+                }
             };
-            url
-        };
 
-        self.backend.onedrive_get_next_list_page(&request_url).await
+            ctx.entries.push_back(entry)
+        }
+
+        Ok(())
     }
 }
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index c5fc3441c..a9e512429 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -380,7 +380,7 @@ impl Accessor for OssBackend {
     type BlockingReader = ();
     type Writer = OssWriters;
     type BlockingWriter = ();
-    type Lister = OssLister;
+    type Lister = oio::PageLister<OssLister>;
     type BlockingLister = ();
 
     fn info(&self) -> AccessorInfo {
@@ -545,16 +545,14 @@ impl Accessor for OssBackend {
     }
 
     async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Lister)> {
-        Ok((
-            RpList::default(),
-            OssLister::new(
-                self.core.clone(),
-                path,
-                args.recursive(),
-                args.limit(),
-                args.start_after(),
-            ),
-        ))
+        let l = OssLister::new(
+            self.core.clone(),
+            path,
+            args.recursive(),
+            args.limit(),
+            args.start_after(),
+        );
+        Ok((RpList::default(), oio::PageLister::new(l)))
     }
 
     async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
diff --git a/core/src/services/oss/core.rs b/core/src/services/oss/core.rs
index f0feb59d2..c90439de1 100644
--- a/core/src/services/oss/core.rs
+++ b/core/src/services/oss/core.rs
@@ -324,7 +324,7 @@ impl OssCore {
     pub fn oss_list_object_request(
         &self,
         path: &str,
-        token: Option<&str>,
+        token: &str,
         delimiter: &str,
         limit: Option<usize>,
         start_after: Option<String>,
@@ -347,13 +347,9 @@ impl OssCore {
         }
 
         // continuation_token
-        if let Some(continuation_token) = token {
-            write!(
-                url,
-                "&continuation-token={}",
-                percent_encode_path(continuation_token)
-            )
-            .expect("write into string must succeed");
+        if !token.is_empty() {
+            write!(url, "&continuation-token={}", percent_encode_path(token))
+                .expect("write into string must succeed");
         }
 
         // start-after
@@ -446,7 +442,7 @@ impl OssCore {
     pub async fn oss_list_object(
         &self,
         path: &str,
-        token: Option<&str>,
+        token: &str,
         delimiter: &str,
         limit: Option<usize>,
         start_after: Option<String>,
diff --git a/core/src/services/oss/lister.rs b/core/src/services/oss/lister.rs
index 7b4b0bce3..6412cf8bf 100644
--- a/core/src/services/oss/lister.rs
+++ b/core/src/services/oss/lister.rs
@@ -41,9 +41,6 @@ pub struct OssLister {
     /// Filter results to objects whose names are lexicographically
     /// **equal to or after** startOffset
     start_after: Option<String>,
-
-    token: Option<String>,
-    done: bool,
 }
 
 impl OssLister {
@@ -61,28 +58,25 @@ impl OssLister {
             delimiter,
             limit,
             start_after: start_after.map(String::from),
-            token: None,
-
-            done: false,
         }
     }
 }
 
 #[async_trait]
-impl oio::List for OssLister {
-    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        if self.done {
-            return Ok(None);
-        }
-
+impl oio::PageList for OssLister {
+    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
         let resp = self
             .core
             .oss_list_object(
                 &self.path,
-                self.token.as_deref(),
+                &ctx.token,
                 self.delimiter,
                 self.limit,
-                self.start_after.clone(),
+                if ctx.token.is_empty() {
+                    self.start_after.clone()
+                } else {
+                    None
+                },
             )
             .await?;
 
@@ -95,17 +89,15 @@ impl oio::List for OssLister {
         let output: ListBucketOutput = de::from_reader(bs.reader())
             .map_err(|e| Error::new(ErrorKind::Unexpected, "deserialize 
xml").set_source(e))?;
 
-        self.done = !output.is_truncated;
-        self.token = output.next_continuation_token.clone();
-
-        let mut entries = Vec::with_capacity(output.common_prefixes.len() + 
output.contents.len());
+        ctx.done = !output.is_truncated;
+        ctx.token = output.next_continuation_token.unwrap_or_default();
 
         for prefix in output.common_prefixes {
             let de = oio::Entry::new(
                 &build_rel_path(&self.core.root, &prefix.prefix),
                 Metadata::new(EntryMode::DIR),
             );
-            entries.push(de);
+            ctx.entries.push_back(de);
         }
 
         for object in output.contents {
@@ -129,10 +121,10 @@ impl oio::List for OssLister {
             let path = unescape(&rel)
                 .map_err(|e| Error::new(ErrorKind::Unexpected, "excapse 
xml").set_source(e))?;
             let de = oio::Entry::new(&path, meta);
-            entries.push(de);
+            ctx.entries.push_back(de);
         }
 
-        Ok(Some(entries))
+        Ok(())
     }
 }
 
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index 7e1ccfc85..56663f522 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -969,7 +969,7 @@ impl Accessor for S3Backend {
     type BlockingReader = ();
     type Writer = S3Writers;
     type BlockingWriter = ();
-    type Lister = S3Lister;
+    type Lister = oio::PageLister<S3Lister>;
     type BlockingLister = ();
 
     fn info(&self) -> AccessorInfo {
@@ -1134,16 +1134,14 @@ impl Accessor for S3Backend {
     }
 
     async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Lister)> {
-        Ok((
-            RpList::default(),
-            S3Lister::new(
-                self.core.clone(),
-                path,
-                args.recursive(),
-                args.limit(),
-                args.start_after(),
-            ),
-        ))
+        let l = S3Lister::new(
+            self.core.clone(),
+            path,
+            args.recursive(),
+            args.limit(),
+            args.start_after(),
+        );
+        Ok((RpList::default(), oio::PageLister::new(l)))
     }
 
     async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
diff --git a/core/src/services/s3/lister.rs b/core/src/services/s3/lister.rs
index 431abd8f1..78d5e1ff3 100644
--- a/core/src/services/s3/lister.rs
+++ b/core/src/services/s3/lister.rs
@@ -38,9 +38,6 @@ pub struct S3Lister {
 
     /// Amazon S3 starts listing **after** this specified key
     start_after: Option<String>,
-
-    token: String,
-    done: bool,
 }
 
 impl S3Lister {
@@ -59,28 +56,26 @@ impl S3Lister {
             delimiter,
             limit,
             start_after: start_after.map(String::from),
-
-            token: "".to_string(),
-            done: false,
         }
     }
 }
 
 #[async_trait]
-impl oio::List for S3Lister {
-    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        if self.done {
-            return Ok(None);
-        }
-
+impl oio::PageList for S3Lister {
+    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
         let resp = self
             .core
             .s3_list_objects(
                 &self.path,
-                &self.token,
+                &ctx.token,
                 self.delimiter,
                 self.limit,
-                self.start_after.clone(),
+                // State after should only be set for the first page.
+                if ctx.token.is_empty() {
+                    self.start_after.clone()
+                } else {
+                    None
+                },
             )
             .await?;
 
@@ -97,16 +92,14 @@ impl oio::List for S3Lister {
         // - Check `is_truncated`
         // - Check `next_continuation_token`
         // - Check the length of `common_prefixes` and `contents` (very rarely 
case)
-        self.done = if let Some(is_truncated) = output.is_truncated {
+        ctx.done = if let Some(is_truncated) = output.is_truncated {
             !is_truncated
         } else if let Some(next_continuation_token) = 
output.next_continuation_token.as_ref() {
             next_continuation_token.is_empty()
         } else {
             output.common_prefixes.is_empty() && output.contents.is_empty()
         };
-        self.token = 
output.next_continuation_token.clone().unwrap_or_default();
-
-        let mut entries = Vec::with_capacity(output.common_prefixes.len() + 
output.contents.len());
+        ctx.token = output.next_continuation_token.clone().unwrap_or_default();
 
         for prefix in output.common_prefixes {
             let de = oio::Entry::new(
@@ -114,7 +107,7 @@ impl oio::List for S3Lister {
                 Metadata::new(EntryMode::DIR),
             );
 
-            entries.push(de);
+            ctx.entries.push_back(de);
         }
 
         for object in output.contents {
@@ -139,10 +132,10 @@ impl oio::List for S3Lister {
 
             let de = oio::Entry::new(&build_rel_path(&self.core.root, 
&object.key), meta);
 
-            entries.push(de);
+            ctx.entries.push_back(de);
         }
 
-        Ok(Some(entries))
+        Ok(())
     }
 }
 
diff --git a/core/src/services/sftp/lister.rs b/core/src/services/sftp/lister.rs
index b130629ca..e69f92213 100644
--- a/core/src/services/sftp/lister.rs
+++ b/core/src/services/sftp/lister.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use std::pin::Pin;
+use std::task::{ready, Context, Poll};
 
 use async_trait::async_trait;
 use futures::StreamExt;
@@ -47,24 +48,18 @@ impl SftpLister {
 
 #[async_trait]
 impl oio::List for SftpLister {
-    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        if self.limit == 0 {
-            return Ok(None);
-        }
-
-        let item = self.dir.next().await;
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Result<Option<oio::Entry>>> {
+        let item = ready!(self.dir.poll_next_unpin(cx)).transpose()?;
 
         match item {
-            Some(Ok(e)) => {
+            Some(e) => {
                 if e.filename().to_str() == Some(".") || e.filename().to_str() 
== Some("..") {
-                    self.next().await
+                    self.poll_next(cx)
                 } else {
-                    self.limit -= 1;
-                    Ok(Some(vec![map_entry(self.prefix.as_str(), e.clone())]))
+                    Poll::Ready(Ok(Some(map_entry(self.prefix.as_str(), e))))
                 }
             }
-            Some(Err(e)) => Err(e.into()),
-            None => Ok(None),
+            None => Poll::Ready(Ok(None)),
         }
     }
 }
diff --git a/core/src/services/swift/backend.rs 
b/core/src/services/swift/backend.rs
index 6f6b22dba..9fa58b7dd 100644
--- a/core/src/services/swift/backend.rs
+++ b/core/src/services/swift/backend.rs
@@ -218,7 +218,7 @@ impl Accessor for SwiftBackend {
     type BlockingReader = ();
     type Writer = oio::OneShotWriter<SwiftWriter>;
     type BlockingWriter = ();
-    type Lister = SwiftLister;
+    type Lister = oio::PageLister<SwiftLister>;
     type BlockingLister = ();
 
     fn info(&self) -> AccessorInfo {
@@ -329,8 +329,8 @@ impl Accessor for SwiftBackend {
     }
 
     async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Lister)> {
-        let op = SwiftLister::new(self.core.clone(), path.to_string(), 
args.recursive());
+        let l = SwiftLister::new(self.core.clone(), path.to_string(), 
args.recursive());
 
-        Ok((RpList::default(), op))
+        Ok((RpList::default(), oio::PageLister::new(l)))
     }
 }
diff --git a/core/src/services/swift/lister.rs 
b/core/src/services/swift/lister.rs
index d69bb42b5..7fdfa0d5b 100644
--- a/core/src/services/swift/lister.rs
+++ b/core/src/services/swift/lister.rs
@@ -29,7 +29,6 @@ pub struct SwiftLister {
     core: Arc<SwiftCore>,
     path: String,
     delimiter: &'static str,
-    done: bool,
 }
 
 impl SwiftLister {
@@ -39,18 +38,13 @@ impl SwiftLister {
             core,
             path,
             delimiter,
-            done: false,
         }
     }
 }
 
 #[async_trait]
-impl oio::List for SwiftLister {
-    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        if self.done {
-            return Ok(None);
-        }
-
+impl oio::PageList for SwiftLister {
+    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
         let response = self.core.swift_list(&self.path, self.delimiter).await?;
 
         let status_code = response.status();
@@ -60,15 +54,13 @@ impl oio::List for SwiftLister {
             return Err(error);
         }
 
+        ctx.done = true;
+
         let bytes = response.into_body().bytes().await?;
         let mut decoded_response = 
serde_json::from_slice::<Vec<ListOpResponse>>(&bytes)
             .map_err(new_json_deserialize_error)?;
 
-        self.done = true;
-
-        let mut entries = Vec::with_capacity(decoded_response.len());
-
-        while let Some(status) = decoded_response.pop() {
+        for status in decoded_response {
             let entry: oio::Entry = match status {
                 ListOpResponse::Subdir { subdir } => {
                     let meta = Metadata::new(EntryMode::DIR);
@@ -99,9 +91,9 @@ impl oio::List for SwiftLister {
                     oio::Entry::new(&name, meta)
                 }
             };
-            entries.push(entry);
+            ctx.entries.push_back(entry);
         }
-        Ok(Some(entries))
+        Ok(())
     }
 }
 
diff --git a/core/src/services/webdav/backend.rs 
b/core/src/services/webdav/backend.rs
index 09e0f839a..aa676c44c 100644
--- a/core/src/services/webdav/backend.rs
+++ b/core/src/services/webdav/backend.rs
@@ -223,7 +223,7 @@ impl Accessor for WebdavBackend {
     type BlockingReader = ();
     type Writer = oio::OneShotWriter<WebdavWriter>;
     type BlockingWriter = ();
-    type Lister = Option<WebdavLister>;
+    type Lister = Option<oio::PageLister<WebdavLister>>;
     type BlockingLister = ();
 
     fn info(&self) -> AccessorInfo {
@@ -398,10 +398,9 @@ impl Accessor for WebdavBackend {
                 let result: Multistatus =
                     
quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;
 
-                Ok((
-                    RpList::default(),
-                    Some(WebdavLister::new(&self.base_dir, &self.root, path, 
result)),
-                ))
+                let l = WebdavLister::new(&self.base_dir, &self.root, path, 
result);
+
+                Ok((RpList::default(), Some(oio::PageLister::new(l))))
             }
             StatusCode::NOT_FOUND if path.ends_with('/') => 
Ok((RpList::default(), None)),
             _ => Err(parse_error(resp).await?),
diff --git a/core/src/services/webdav/lister.rs 
b/core/src/services/webdav/lister.rs
index 84e97aca6..e98c77db3 100644
--- a/core/src/services/webdav/lister.rs
+++ b/core/src/services/webdav/lister.rs
@@ -15,8 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::mem;
-
 use async_trait::async_trait;
 use serde::Deserialize;
 
@@ -30,6 +28,7 @@ pub struct WebdavLister {
 }
 
 impl WebdavLister {
+    /// TODO: sending request in `next_page` instead of in `new`.
     pub fn new(base_dir: &str, root: &str, path: &str, multistates: 
Multistatus) -> Self {
         Self {
             base_dir: base_dir.to_string(),
@@ -41,14 +40,10 @@ impl WebdavLister {
 }
 
 #[async_trait]
-impl oio::List for WebdavLister {
-    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        if self.multistates.response.is_empty() {
-            return Ok(None);
-        };
-        let oes = mem::take(&mut self.multistates.response);
-
-        let mut entries = Vec::with_capacity(oes.len());
+impl oio::PageList for WebdavLister {
+    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
+        // Build request instead of clone here.
+        let oes = self.multistates.response.clone();
 
         for res in oes {
             let path = res
@@ -70,19 +65,20 @@ impl oio::List for WebdavLister {
             }
 
             let meta = res.parse_into_metadata()?;
-            entries.push(oio::Entry::new(&decoded_path, meta))
+            ctx.entries.push_back(oio::Entry::new(&decoded_path, meta))
         }
+        ctx.done = true;
 
-        Ok(Some(entries))
+        Ok(())
     }
 }
 
-#[derive(Deserialize, Debug, PartialEq, Eq)]
+#[derive(Deserialize, Debug, PartialEq, Eq, Clone)]
 pub struct Multistatus {
     pub response: Vec<ListOpResponse>,
 }
 
-#[derive(Deserialize, Debug, PartialEq, Eq)]
+#[derive(Deserialize, Debug, PartialEq, Eq, Clone)]
 pub struct ListOpResponse {
     pub href: String,
     pub propstat: Propstat,
@@ -141,13 +137,13 @@ impl ListOpResponse {
     }
 }
 
-#[derive(Deserialize, Debug, PartialEq, Eq)]
+#[derive(Deserialize, Debug, PartialEq, Eq, Clone)]
 pub struct Propstat {
     pub prop: Prop,
     pub status: String,
 }
 
-#[derive(Deserialize, Debug, PartialEq, Eq)]
+#[derive(Deserialize, Debug, PartialEq, Eq, Clone)]
 pub struct Prop {
     #[serde(default)]
     pub displayname: String,
@@ -158,13 +154,13 @@ pub struct Prop {
     pub resourcetype: ResourceTypeContainer,
 }
 
-#[derive(Deserialize, Debug, PartialEq, Eq)]
+#[derive(Deserialize, Debug, PartialEq, Eq, Clone)]
 pub struct ResourceTypeContainer {
     #[serde(rename = "$value")]
     pub value: Option<ResourceType>,
 }
 
-#[derive(Deserialize, Debug, PartialEq, Eq)]
+#[derive(Deserialize, Debug, PartialEq, Eq, Clone)]
 #[serde(rename_all = "lowercase")]
 pub enum ResourceType {
     Collection,
diff --git a/core/src/services/webhdfs/backend.rs 
b/core/src/services/webhdfs/backend.rs
index 3352e47d0..9fa9bd150 100644
--- a/core/src/services/webhdfs/backend.rs
+++ b/core/src/services/webhdfs/backend.rs
@@ -31,10 +31,8 @@ use super::error::parse_error;
 use super::error::parse_error_msg;
 use super::lister::WebhdfsLister;
 use super::message::BooleanResp;
-use super::message::DirectoryListingWrapper;
 use super::message::FileStatusType;
 use super::message::FileStatusWrapper;
-use super::message::FileStatusesWrapper;
 use super::writer::WebhdfsWriter;
 use crate::raw::*;
 use crate::*;
@@ -277,7 +275,10 @@ impl WebhdfsBackend {
         Ok(req)
     }
 
-    fn webhdfs_list_status_request(&self, path: &str) -> 
Result<Request<AsyncBody>> {
+    pub async fn webhdfs_list_status_request(
+        &self,
+        path: &str,
+    ) -> Result<Response<IncomingAsyncBody>> {
         let p = build_abs_path(&self.root, path);
         let mut url = format!(
             "{}/webhdfs/v1/{}?op=LISTSTATUS",
@@ -291,29 +292,24 @@ impl WebhdfsBackend {
         let req = Request::get(&url)
             .body(AsyncBody::Empty)
             .map_err(new_request_build_error)?;
-        Ok(req)
+        self.client.send(req).await
     }
 
-    pub(super) fn webhdfs_list_status_batch_request(
+    pub async fn webhdfs_list_status_batch_request(
         &self,
         path: &str,
-        args: &OpList,
-    ) -> Result<Request<AsyncBody>> {
+        start_after: &str,
+    ) -> Result<Response<IncomingAsyncBody>> {
         let p = build_abs_path(&self.root, path);
 
-        // if it's not the first time to call LISTSTATUS_BATCH, we will add 
&startAfter=<CHILD>
-        let start_after_param = match args.start_after() {
-            Some(sa) if sa.is_empty() => String::new(),
-            Some(sa) => format!("&startAfter={}", sa),
-            None => String::new(),
-        };
-
         let mut url = format!(
-            "{}/webhdfs/v1/{}?op=LISTSTATUS_BATCH{}",
+            "{}/webhdfs/v1/{}?op=LISTSTATUS_BATCH",
             self.endpoint,
             percent_encode_path(&p),
-            start_after_param
         );
+        if !start_after.is_empty() {
+            url += format!("&startAfter={}", start_after).as_str();
+        }
         if let Some(auth) = &self.auth {
             url += format!("&{auth}").as_str();
         }
@@ -321,7 +317,7 @@ impl WebhdfsBackend {
         let req = Request::get(&url)
             .body(AsyncBody::Empty)
             .map_err(new_request_build_error)?;
-        Ok(req)
+        self.client.send(req).await
     }
 
     async fn webhdfs_read_file(
@@ -402,7 +398,7 @@ impl Accessor for WebhdfsBackend {
     type BlockingReader = ();
     type Writer = oio::OneShotWriter<WebhdfsWriter>;
     type BlockingWriter = ();
-    type Lister = WebhdfsLister;
+    type Lister = oio::PageLister<WebhdfsLister>;
     type BlockingLister = ();
 
     fn info(&self) -> AccessorInfo {
@@ -549,46 +545,7 @@ impl Accessor for WebhdfsBackend {
         }
 
         let path = path.trim_end_matches('/');
-
-        if !self.disable_list_batch {
-            let req = self.webhdfs_list_status_batch_request(path, 
&OpList::default())?;
-            let resp = self.client.send(req).await?;
-            match resp.status() {
-                StatusCode::OK => {
-                    let bs = resp.into_body().bytes().await?;
-                    let directory_listing = 
serde_json::from_slice::<DirectoryListingWrapper>(&bs)
-                        .map_err(new_json_deserialize_error)?
-                        .directory_listing;
-                    let file_statuses = 
directory_listing.partial_listing.file_statuses.file_status;
-                    let mut objects = WebhdfsLister::new(self.clone(), path, 
file_statuses);
-                    
objects.set_remaining_entries(directory_listing.remaining_entries);
-                    Ok((RpList::default(), objects))
-                }
-                StatusCode::NOT_FOUND => {
-                    let objects = WebhdfsLister::new(self.clone(), path, 
vec![]);
-                    Ok((RpList::default(), objects))
-                }
-                _ => Err(parse_error(resp).await?),
-            }
-        } else {
-            let req = self.webhdfs_list_status_request(path)?;
-            let resp = self.client.send(req).await?;
-            match resp.status() {
-                StatusCode::OK => {
-                    let bs = resp.into_body().bytes().await?;
-                    let file_statuses = 
serde_json::from_slice::<FileStatusesWrapper>(&bs)
-                        .map_err(new_json_deserialize_error)?
-                        .file_statuses
-                        .file_status;
-                    let objects = WebhdfsLister::new(self.clone(), path, 
file_statuses);
-                    Ok((RpList::default(), objects))
-                }
-                StatusCode::NOT_FOUND => {
-                    let objects = WebhdfsLister::new(self.clone(), path, 
vec![]);
-                    Ok((RpList::default(), objects))
-                }
-                _ => Err(parse_error(resp).await?),
-            }
-        }
+        let l = WebhdfsLister::new(self.clone(), path);
+        Ok((RpList::default(), oio::PageLister::new(l)))
     }
 }
diff --git a/core/src/services/webhdfs/lister.rs 
b/core/src/services/webhdfs/lister.rs
index 8c6579a5d..0857414fc 100644
--- a/core/src/services/webhdfs/lister.rs
+++ b/core/src/services/webhdfs/lister.rs
@@ -20,90 +20,75 @@ use http::StatusCode;
 
 use super::backend::WebhdfsBackend;
 use super::error::parse_error;
-use super::message::DirectoryListingWrapper;
-use super::message::FileStatus;
-use super::message::FileStatusType;
+use super::message::*;
 use crate::raw::*;
 use crate::*;
 
 pub struct WebhdfsLister {
     backend: WebhdfsBackend,
     path: String,
-    statuses: Vec<FileStatus>,
-    batch_start_after: Option<String>,
-    remaining_entries: u32,
 }
 
 impl WebhdfsLister {
-    pub fn new(backend: WebhdfsBackend, path: &str, statuses: Vec<FileStatus>) 
-> Self {
+    pub fn new(backend: WebhdfsBackend, path: &str) -> Self {
         Self {
             backend,
             path: path.to_string(),
-            batch_start_after: statuses.last().map(|f| f.path_suffix.clone()),
-            statuses,
-            remaining_entries: 0,
         }
     }
-
-    pub(super) fn set_remaining_entries(&mut self, remaining_entries: u32) {
-        self.remaining_entries = remaining_entries;
-    }
 }
 
 #[async_trait]
-impl oio::List for WebhdfsLister {
-    /// Returns the next page of entries.
-    ///
-    /// Note: default list status with batch, calling next will query for next 
batch if `remaining_entries` > 0.
-    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
-        if self.statuses.is_empty() && self.remaining_entries == 0 {
-            return Ok(None);
-        }
+impl oio::PageList for WebhdfsLister {
+    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
+        let file_status = if self.backend.disable_list_batch {
+            let resp = 
self.backend.webhdfs_list_status_request(&self.path).await?;
+            match resp.status() {
+                StatusCode::OK => {
+                    ctx.done = true;
 
-        return match self.backend.disable_list_batch {
-            true => self.webhdfs_get_next_list_statuses(),
-            false => {
-                let args = OpList::with_start_after(
-                    OpList::default(),
-                    &self.batch_start_after.clone().unwrap(),
-                );
-                let req = self
-                    .backend
-                    .webhdfs_list_status_batch_request(&self.path, &args)?;
-                let resp = self.backend.client.send(req).await?;
+                    let bs = resp.into_body().bytes().await?;
+                    serde_json::from_slice::<FileStatusesWrapper>(&bs)
+                        .map_err(new_json_deserialize_error)?
+                        .file_statuses
+                        .file_status
+                }
+                StatusCode::NOT_FOUND => {
+                    ctx.done = true;
+                    return Ok(());
+                }
+                _ => return Err(parse_error(resp).await?),
+            }
+        } else {
+            let resp = self
+                .backend
+                .webhdfs_list_status_batch_request(&self.path, &ctx.token)
+                .await?;
+            match resp.status() {
+                StatusCode::OK => {
+                    let bs = resp.into_body().bytes().await?;
+                    let directory_listing = 
serde_json::from_slice::<DirectoryListingWrapper>(&bs)
+                        .map_err(new_json_deserialize_error)?
+                        .directory_listing;
+                    let file_statuses = 
directory_listing.partial_listing.file_statuses.file_status;
 
-                match resp.status() {
-                    StatusCode::OK => {
-                        let bs = resp.into_body().bytes().await?;
-                        let directory_listing =
-                            
serde_json::from_slice::<DirectoryListingWrapper>(&bs)
-                                .map_err(new_json_deserialize_error)?;
-                        let file_statuses = directory_listing
-                            .directory_listing
-                            .partial_listing
-                            .file_statuses
-                            .file_status;
-                        self.remaining_entries =
-                            
directory_listing.directory_listing.remaining_entries;
-                        self.batch_start_after =
-                            file_statuses.last().map(|f| 
f.path_suffix.clone());
-                        self.statuses.extend(file_statuses);
-                        self.webhdfs_get_next_list_statuses()
+                    if directory_listing.remaining_entries == 0 {
+                        ctx.done = true;
+                    } else if file_statuses.len() > 0 {
+                        ctx.token = 
file_statuses.last().unwrap().path_suffix.clone();
                     }
-                    StatusCode::NOT_FOUND => 
self.webhdfs_get_next_list_statuses(),
-                    _ => Err(parse_error(resp).await?),
+
+                    file_statuses
                 }
+                StatusCode::NOT_FOUND => {
+                    ctx.done = true;
+                    return Ok(());
+                }
+                _ => return Err(parse_error(resp).await?),
             }
         };
-    }
-}
 
-impl WebhdfsLister {
-    /// Returns the next page of entries.
-    fn webhdfs_get_next_list_statuses(&mut self) -> 
Result<Option<Vec<oio::Entry>>> {
-        let mut entries = Vec::with_capacity(self.statuses.len());
-
-        while let Some(status) = self.statuses.pop() {
+        for status in file_status {
             let mut path = if self.path.is_empty() {
                 status.path_suffix.to_string()
             } else {
@@ -126,8 +111,9 @@ impl WebhdfsLister {
                 path += "/"
             }
             let entry = oio::Entry::new(&path, meta);
-            entries.push(entry);
+            ctx.entries.push_back(entry);
         }
-        Ok(Some(entries))
+
+        Ok(())
     }
 }
diff --git a/core/src/types/list.rs b/core/src/types/list.rs
index ac01496b3..5493d58db 100644
--- a/core/src/types/list.rs
+++ b/core/src/types/list.rs
@@ -26,11 +26,10 @@ use futures::future::BoxFuture;
 use futures::FutureExt;
 use futures::Stream;
 
+use crate::raw::oio::List;
 use crate::raw::*;
 use crate::*;
 
-/// Future constructed by listing.
-type ListFuture = BoxFuture<'static, (oio::Lister, 
Result<Option<Vec<oio::Entry>>>)>;
 /// Future constructed by stating.
 type StatFuture = BoxFuture<'static, (String, Result<RpStat>)>;
 
@@ -42,12 +41,10 @@ type StatFuture = BoxFuture<'static, (String, 
Result<RpStat>)>;
 /// User can use lister as `Stream<Item = Result<Entry>>`.
 pub struct Lister {
     acc: FusedAccessor,
+    lister: oio::Lister,
     /// required_metakey is the metakey required by users.
     required_metakey: FlagSet<Metakey>,
 
-    buf: VecDeque<oio::Entry>,
-    lister: Option<oio::Lister>,
-    listing: Option<ListFuture>,
     stating: Option<StatFuture>,
 }
 
@@ -64,11 +61,9 @@ impl Lister {
 
         Ok(Self {
             acc,
+            lister,
             required_metakey,
 
-            buf: VecDeque::new(),
-            lister: Some(lister),
-            listing: None,
             stating: None,
         })
     }
@@ -83,52 +78,39 @@ impl Stream for Lister {
 
             // Make sure we will not poll this future again.
             self.stating = None;
-            let metadata = rp?.into_metadata();
+            // TODO: we should rebuild the future if stat failed.
+            let metadata = match rp {
+                Ok(rp) => rp.into_metadata(),
+                Err(err) => {
+                    let acc = self.acc.clone();
+                    let fut = async move {
+                        let res = acc.stat(&path, OpStat::default()).await;
+
+                        (path, res)
+                    };
+                    self.stating = Some(Box::pin(fut));
+                    return Poll::Ready(Some(Err(err)));
+                }
+            };
 
             return Poll::Ready(Some(Ok(Entry::new(path, metadata))));
         }
 
-        if let Some(oe) = self.buf.pop_front() {
-            let (path, metadata) = oe.into_entry().into_parts();
-            // TODO: we can optimize this by checking the provided metakey 
provided by services.
-            if metadata.contains_metakey(self.required_metakey) {
-                return Poll::Ready(Some(Ok(Entry::new(path, metadata))));
-            }
-
-            let acc = self.acc.clone();
-            let fut = async move {
-                let res = acc.stat(&path, OpStat::default()).await;
-
-                (path, res)
-            };
-            self.stating = Some(Box::pin(fut));
-            return self.poll_next(cx);
-        }
-
-        if let Some(fut) = self.listing.as_mut() {
-            let (op, res) = ready!(fut.poll_unpin(cx));
-
-            // Make sure we will not poll this future again.
-            self.listing = None;
-
-            return match res? {
-                Some(oes) => {
-                    self.lister = Some(op);
-                    self.buf = oes.into();
-                    self.poll_next(cx)
+        match ready!(self.lister.poll_next(cx))? {
+            Some(oe) => {
+                let (path, metadata) = oe.into_entry().into_parts();
+                // TODO: we can optimize this by checking the provided metakey 
provided by services.
+                if metadata.contains_metakey(self.required_metakey) {
+                    return Poll::Ready(Some(Ok(Entry::new(path, metadata))));
                 }
-                None => Poll::Ready(None),
-            };
-        }
 
-        match self.lister.take() {
-            Some(mut lister) => {
+                let acc = self.acc.clone();
                 let fut = async move {
-                    let res = lister.next().await;
+                    let res = acc.stat(&path, OpStat::default()).await;
 
-                    (lister, res)
+                    (path, res)
                 };
-                self.listing = Some(Box::pin(fut));
+                self.stating = Some(Box::pin(fut));
                 self.poll_next(cx)
             }
             None => Poll::Ready(None),

Reply via email to