This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 9e8c92e5f fix(core): Invalid lister should not panic nor endless loop 
(#2931)
9e8c92e5f is described below

commit 9e8c92e5f128db32d9c42938e9cd7af48d0cf71d
Author: Xuanwo <[email protected]>
AuthorDate: Fri Aug 25 13:36:45 2023 +0800

    fix(core): Invalid lister should not panic nor endless loop (#2931)
    
    fix(core): Invalid lister should not panice nor endless loop
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/types/list.rs | 72 ++++++++++++++++++++++++++++++++++++++++----------
 1 file changed, 58 insertions(+), 14 deletions(-)

diff --git a/core/src/types/list.rs b/core/src/types/list.rs
index 514056dba..6e67aa430 100644
--- a/core/src/types/list.rs
+++ b/core/src/types/list.rs
@@ -80,9 +80,11 @@ impl Stream for Lister {
     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
         if let Some(fut) = self.stating.as_mut() {
             let (path, rp) = ready!(fut.poll_unpin(cx));
-            let metadata = rp?.into_metadata();
 
+            // Make sure we will not poll this future again.
             self.stating = None;
+            let metadata = rp?.into_metadata();
+
             return Poll::Ready(Some(Ok(Entry::new(path, metadata))));
         }
 
@@ -106,29 +108,32 @@ impl Stream for Lister {
 
         if let Some(fut) = self.listing.as_mut() {
             let (op, res) = ready!(fut.poll_unpin(cx));
-            self.pager = Some(op);
+
+            // Make sure we will not poll this future again.
+            self.listing = None;
 
             return match res? {
                 Some(oes) => {
-                    self.listing = None;
+                    self.pager = Some(op);
                     self.buf = oes.into();
                     self.poll_next(cx)
                 }
-                None => {
-                    self.listing = None;
-                    Poll::Ready(None)
-                }
+                None => Poll::Ready(None),
             };
         }
 
-        let mut pager = self.pager.take().expect("pager must be valid");
-        let fut = async move {
-            let res = pager.next().await;
+        match self.pager.take() {
+            Some(mut pager) => {
+                let fut = async move {
+                    let res = pager.next().await;
 
-            (pager, res)
-        };
-        self.listing = Some(Box::pin(fut));
-        self.poll_next(cx)
+                    (pager, res)
+                };
+                self.listing = Some(Box::pin(fut));
+                self.poll_next(cx)
+            }
+            None => Poll::Ready(None),
+        }
     }
 }
 
@@ -177,3 +182,42 @@ impl Iterator for BlockingLister {
         self.next()
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::services::Azblob;
+    use futures::future;
+    use futures::StreamExt;
+
+    /// Inspired by 
<https://gist.github.com/kyle-mccarthy/1e6ae89cc34495d731b91ebf5eb5a3d9>
+    ///
+    /// Invalid lister should not panic nor endless loop.
+    #[tokio::test]
+    async fn test_invalid_lister() -> Result<()> {
+        let mut builder = Azblob::default();
+
+        builder
+            .container("container")
+            .account_name("account_name")
+            .account_key("account_key")
+            .endpoint("https://account_name.blob.core.windows.net";);
+
+        let operator = Operator::new(builder)?.finish();
+
+        let lister = operator.lister("/").await?;
+
+        lister
+            .filter_map(|entry| {
+                dbg!(&entry);
+                future::ready(entry.ok())
+            })
+            .for_each(|entry| {
+                println!("{:?}", entry);
+                future::ready(())
+            })
+            .await;
+
+        Ok(())
+    }
+}

Reply via email to