This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch polish-lister
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 4f7f7560cc0f81bf38488d63983e621e6ade76bc
Author: Xuanwo <[email protected]>
AuthorDate: Wed Nov 15 23:34:15 2023 +0800

    polish errored
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/types/list.rs | 57 ++++++++++++++++++++++++++++++++++----------------
 1 file changed, 39 insertions(+), 18 deletions(-)

diff --git a/core/src/types/list.rs b/core/src/types/list.rs
index 4b7813139..ce511155a 100644
--- a/core/src/types/list.rs
+++ b/core/src/types/list.rs
@@ -35,9 +35,10 @@ type StatFuture = BoxFuture<'static, (String, 
Result<RpStat>)>;
 /// Lister is designed to list entries at given path in an asynchronous
 /// manner.
 ///
-/// Users can construct Lister by [`Operator::lister`].
+/// Users can construct Lister by [`Operator::lister`] or 
[`Operator::lister_with`].
 ///
-/// User can use lister as `Stream<Item = Result<Entry>>`.
+/// - Lister implements `Stream<Item = Result<Entry>>`.
+/// - Lister will return `None` if there is no more entries or error has been 
returned.
 pub struct Lister {
     acc: FusedAccessor,
     lister: oio::Lister,
@@ -45,6 +46,7 @@ pub struct Lister {
     required_metakey: FlagSet<Metakey>,
 
     stating: Option<StatFuture>,
+    errored: bool,
 }
 
 /// # Safety
@@ -64,6 +66,7 @@ impl Lister {
             required_metakey,
 
             stating: None,
+            errored: false,
         })
     }
 }
@@ -72,22 +75,20 @@ impl Stream for Lister {
     type Item = Result<Entry>;
 
     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
+        // Returns `None` if we have errored.
+        if self.errored {
+            return Poll::Ready(None);
+        }
+
         if let Some(fut) = self.stating.as_mut() {
             let (path, rp) = ready!(fut.poll_unpin(cx));
 
             // Make sure we will not poll this future again.
             self.stating = None;
-            // TODO: we should rebuild the future if stat failed.
             let metadata = match rp {
                 Ok(rp) => rp.into_metadata(),
                 Err(err) => {
-                    let acc = self.acc.clone();
-                    let fut = async move {
-                        let res = acc.stat(&path, OpStat::default()).await;
-
-                        (path, res)
-                    };
-                    self.stating = Some(Box::pin(fut));
+                    self.errored = true;
                     return Poll::Ready(Some(Err(err)));
                 }
             };
@@ -95,8 +96,8 @@ impl Stream for Lister {
             return Poll::Ready(Some(Ok(Entry::new(path, metadata))));
         }
 
-        match ready!(self.lister.poll_next(cx))? {
-            Some(oe) => {
+        match ready!(self.lister.poll_next(cx)) {
+            Ok(Some(oe)) => {
                 let (path, metadata) = oe.into_entry().into_parts();
                 // TODO: we can optimize this by checking the provided metakey 
provided by services.
                 if metadata.contains_metakey(self.required_metakey) {
@@ -112,7 +113,11 @@ impl Stream for Lister {
                 self.stating = Some(Box::pin(fut));
                 self.poll_next(cx)
             }
-            None => Poll::Ready(None),
+            Ok(None) => Poll::Ready(None),
+            Err(err) => {
+                self.errored = true;
+                Poll::Ready(Some(Err(err)))
+            }
         }
     }
 }
@@ -120,13 +125,17 @@ impl Stream for Lister {
 /// BlockingLister is designed to list entries at given path in a blocking
 /// manner.
 ///
-/// Users can construct Lister by `blocking_lister`.
+/// Users can construct Lister by [`BlockingOperator::lister`] or 
[`BlockingOperator::lister_with`].
+///
+/// - Lister implements `Iterator<Item = Result<Entry>>`.
+/// - Lister will return `None` if there is no more entries or error has been 
returned.
 pub struct BlockingLister {
     acc: FusedAccessor,
     /// required_metakey is the metakey required by users.
     required_metakey: FlagSet<Metakey>,
 
     lister: oio::BlockingLister,
+    errored: bool,
 }
 
 /// # Safety
@@ -145,6 +154,7 @@ impl BlockingLister {
             required_metakey,
 
             lister,
+            errored: false,
         })
     }
 }
@@ -154,12 +164,18 @@ impl Iterator for BlockingLister {
     type Item = Result<Entry>;
 
     fn next(&mut self) -> Option<Self::Item> {
+        // Returns `None` if we have errored.
+        if self.errored {
+            return None;
+        }
+
         let entry = match self.lister.next() {
             Ok(Some(entry)) => entry,
-            Ok(None) => {
-                return None;
+            Ok(None) => return None,
+            Err(err) => {
+                self.errored = true;
+                return Some(Err(err));
             }
-            Err(err) => return Some(Err(err)),
         };
 
         let (path, metadata) = entry.into_entry().into_parts();
@@ -170,7 +186,10 @@ impl Iterator for BlockingLister {
 
         let metadata = match self.acc.blocking_stat(&path, OpStat::default()) {
             Ok(rp) => rp.into_metadata(),
-            Err(err) => return Some(Err(err)),
+            Err(err) => {
+                self.errored = true;
+                return Some(Err(err));
+            }
         };
         Some(Ok(Entry::new(path, metadata)))
     }
@@ -189,6 +208,8 @@ mod tests {
     /// Invalid lister should not panic nor endless loop.
     #[tokio::test]
     async fn test_invalid_lister() -> Result<()> {
+        let _ = tracing_subscriber::fmt().try_init();
+
         let mut builder = Azblob::default();
 
         builder

Reply via email to