This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch fix-invalid-lister in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit b04027fa28c111b26d8fba166d2c2d49ec580097 Author: Xuanwo <[email protected]> AuthorDate: Fri Aug 25 12:19:48 2023 +0800 fix(core): Invalid lister should not panice nor endless loop Signed-off-by: Xuanwo <[email protected]> --- core/src/types/list.rs | 72 ++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 58 insertions(+), 14 deletions(-) diff --git a/core/src/types/list.rs b/core/src/types/list.rs index 514056dba..6e67aa430 100644 --- a/core/src/types/list.rs +++ b/core/src/types/list.rs @@ -80,9 +80,11 @@ impl Stream for Lister { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { if let Some(fut) = self.stating.as_mut() { let (path, rp) = ready!(fut.poll_unpin(cx)); - let metadata = rp?.into_metadata(); + // Make sure we will not poll this future again. self.stating = None; + let metadata = rp?.into_metadata(); + return Poll::Ready(Some(Ok(Entry::new(path, metadata)))); } @@ -106,29 +108,32 @@ impl Stream for Lister { if let Some(fut) = self.listing.as_mut() { let (op, res) = ready!(fut.poll_unpin(cx)); - self.pager = Some(op); + + // Make sure we will not poll this future again. + self.listing = None; return match res? { Some(oes) => { - self.listing = None; + self.pager = Some(op); self.buf = oes.into(); self.poll_next(cx) } - None => { - self.listing = None; - Poll::Ready(None) - } + None => Poll::Ready(None), }; } - let mut pager = self.pager.take().expect("pager must be valid"); - let fut = async move { - let res = pager.next().await; + match self.pager.take() { + Some(mut pager) => { + let fut = async move { + let res = pager.next().await; - (pager, res) - }; - self.listing = Some(Box::pin(fut)); - self.poll_next(cx) + (pager, res) + }; + self.listing = Some(Box::pin(fut)); + self.poll_next(cx) + } + None => Poll::Ready(None), + } } } @@ -177,3 +182,42 @@ impl Iterator for BlockingLister { self.next() } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::services::Azblob; + use futures::future; + use futures::StreamExt; + + /// Inspired by <https://gist.github.com/kyle-mccarthy/1e6ae89cc34495d731b91ebf5eb5a3d9> + /// + /// Invalid lister should not panic nor endless loop. + #[tokio::test] + async fn test_invalid_lister() -> Result<()> { + let mut builder = Azblob::default(); + + builder + .container("container") + .account_name("account_name") + .account_key("account_key") + .endpoint("https://account_name.blob.core.windows.net"); + + let operator = Operator::new(builder)?.finish(); + + let lister = operator.lister("/").await?; + + lister + .filter_map(|entry| { + dbg!(&entry); + future::ready(entry.ok()) + }) + .for_each(|entry| { + println!("{:?}", entry); + future::ready(()) + }) + .await; + + Ok(()) + } +}
