This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 9e8c92e5f fix(core): Invalid lister should not panic nor endless loop
(#2931)
9e8c92e5f is described below
commit 9e8c92e5f128db32d9c42938e9cd7af48d0cf71d
Author: Xuanwo <[email protected]>
AuthorDate: Fri Aug 25 13:36:45 2023 +0800
fix(core): Invalid lister should not panic nor endless loop (#2931)
fix(core): Invalid lister should not panice nor endless loop
Signed-off-by: Xuanwo <[email protected]>
---
core/src/types/list.rs | 72 ++++++++++++++++++++++++++++++++++++++++----------
1 file changed, 58 insertions(+), 14 deletions(-)
diff --git a/core/src/types/list.rs b/core/src/types/list.rs
index 514056dba..6e67aa430 100644
--- a/core/src/types/list.rs
+++ b/core/src/types/list.rs
@@ -80,9 +80,11 @@ impl Stream for Lister {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Option<Self::Item>> {
if let Some(fut) = self.stating.as_mut() {
let (path, rp) = ready!(fut.poll_unpin(cx));
- let metadata = rp?.into_metadata();
+ // Make sure we will not poll this future again.
self.stating = None;
+ let metadata = rp?.into_metadata();
+
return Poll::Ready(Some(Ok(Entry::new(path, metadata))));
}
@@ -106,29 +108,32 @@ impl Stream for Lister {
if let Some(fut) = self.listing.as_mut() {
let (op, res) = ready!(fut.poll_unpin(cx));
- self.pager = Some(op);
+
+ // Make sure we will not poll this future again.
+ self.listing = None;
return match res? {
Some(oes) => {
- self.listing = None;
+ self.pager = Some(op);
self.buf = oes.into();
self.poll_next(cx)
}
- None => {
- self.listing = None;
- Poll::Ready(None)
- }
+ None => Poll::Ready(None),
};
}
- let mut pager = self.pager.take().expect("pager must be valid");
- let fut = async move {
- let res = pager.next().await;
+ match self.pager.take() {
+ Some(mut pager) => {
+ let fut = async move {
+ let res = pager.next().await;
- (pager, res)
- };
- self.listing = Some(Box::pin(fut));
- self.poll_next(cx)
+ (pager, res)
+ };
+ self.listing = Some(Box::pin(fut));
+ self.poll_next(cx)
+ }
+ None => Poll::Ready(None),
+ }
}
}
@@ -177,3 +182,42 @@ impl Iterator for BlockingLister {
self.next()
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::services::Azblob;
+ use futures::future;
+ use futures::StreamExt;
+
+ /// Inspired by
<https://gist.github.com/kyle-mccarthy/1e6ae89cc34495d731b91ebf5eb5a3d9>
+ ///
+ /// Invalid lister should not panic nor endless loop.
+ #[tokio::test]
+ async fn test_invalid_lister() -> Result<()> {
+ let mut builder = Azblob::default();
+
+ builder
+ .container("container")
+ .account_name("account_name")
+ .account_key("account_key")
+ .endpoint("https://account_name.blob.core.windows.net");
+
+ let operator = Operator::new(builder)?.finish();
+
+ let lister = operator.lister("/").await?;
+
+ lister
+ .filter_map(|entry| {
+ dbg!(&entry);
+ future::ready(entry.ok())
+ })
+ .for_each(|entry| {
+ println!("{:?}", entry);
+ future::ready(())
+ })
+ .await;
+
+ Ok(())
+ }
+}