This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch polish-lister in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 4f7f7560cc0f81bf38488d63983e621e6ade76bc Author: Xuanwo <[email protected]> AuthorDate: Wed Nov 15 23:34:15 2023 +0800 polish errored Signed-off-by: Xuanwo <[email protected]> --- core/src/types/list.rs | 57 ++++++++++++++++++++++++++++++++++---------------- 1 file changed, 39 insertions(+), 18 deletions(-) diff --git a/core/src/types/list.rs b/core/src/types/list.rs index 4b7813139..ce511155a 100644 --- a/core/src/types/list.rs +++ b/core/src/types/list.rs @@ -35,9 +35,10 @@ type StatFuture = BoxFuture<'static, (String, Result<RpStat>)>; /// Lister is designed to list entries at given path in an asynchronous /// manner. /// -/// Users can construct Lister by [`Operator::lister`]. +/// Users can construct Lister by [`Operator::lister`] or [`Operator::lister_with`]. /// -/// User can use lister as `Stream<Item = Result<Entry>>`. +/// - Lister implements `Stream<Item = Result<Entry>>`. +/// - Lister will return `None` if there is no more entries or error has been returned. pub struct Lister { acc: FusedAccessor, lister: oio::Lister, @@ -45,6 +46,7 @@ pub struct Lister { required_metakey: FlagSet<Metakey>, stating: Option<StatFuture>, + errored: bool, } /// # Safety @@ -64,6 +66,7 @@ impl Lister { required_metakey, stating: None, + errored: false, }) } } @@ -72,22 +75,20 @@ impl Stream for Lister { type Item = Result<Entry>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + // Returns `None` if we have errored. + if self.errored { + return Poll::Ready(None); + } + if let Some(fut) = self.stating.as_mut() { let (path, rp) = ready!(fut.poll_unpin(cx)); // Make sure we will not poll this future again. self.stating = None; - // TODO: we should rebuild the future if stat failed. let metadata = match rp { Ok(rp) => rp.into_metadata(), Err(err) => { - let acc = self.acc.clone(); - let fut = async move { - let res = acc.stat(&path, OpStat::default()).await; - - (path, res) - }; - self.stating = Some(Box::pin(fut)); + self.errored = true; return Poll::Ready(Some(Err(err))); } }; @@ -95,8 +96,8 @@ impl Stream for Lister { return Poll::Ready(Some(Ok(Entry::new(path, metadata)))); } - match ready!(self.lister.poll_next(cx))? { - Some(oe) => { + match ready!(self.lister.poll_next(cx)) { + Ok(Some(oe)) => { let (path, metadata) = oe.into_entry().into_parts(); // TODO: we can optimize this by checking the provided metakey provided by services. if metadata.contains_metakey(self.required_metakey) { @@ -112,7 +113,11 @@ impl Stream for Lister { self.stating = Some(Box::pin(fut)); self.poll_next(cx) } - None => Poll::Ready(None), + Ok(None) => Poll::Ready(None), + Err(err) => { + self.errored = true; + Poll::Ready(Some(Err(err))) + } } } } @@ -120,13 +125,17 @@ impl Stream for Lister { /// BlockingLister is designed to list entries at given path in a blocking /// manner. /// -/// Users can construct Lister by `blocking_lister`. +/// Users can construct Lister by [`BlockingOperator::lister`] or [`BlockingOperator::lister_with`]. +/// +/// - Lister implements `Iterator<Item = Result<Entry>>`. +/// - Lister will return `None` if there is no more entries or error has been returned. pub struct BlockingLister { acc: FusedAccessor, /// required_metakey is the metakey required by users. required_metakey: FlagSet<Metakey>, lister: oio::BlockingLister, + errored: bool, } /// # Safety @@ -145,6 +154,7 @@ impl BlockingLister { required_metakey, lister, + errored: false, }) } } @@ -154,12 +164,18 @@ impl Iterator for BlockingLister { type Item = Result<Entry>; fn next(&mut self) -> Option<Self::Item> { + // Returns `None` if we have errored. + if self.errored { + return None; + } + let entry = match self.lister.next() { Ok(Some(entry)) => entry, - Ok(None) => { - return None; + Ok(None) => return None, + Err(err) => { + self.errored = true; + return Some(Err(err)); } - Err(err) => return Some(Err(err)), }; let (path, metadata) = entry.into_entry().into_parts(); @@ -170,7 +186,10 @@ impl Iterator for BlockingLister { let metadata = match self.acc.blocking_stat(&path, OpStat::default()) { Ok(rp) => rp.into_metadata(), - Err(err) => return Some(Err(err)), + Err(err) => { + self.errored = true; + return Some(Err(err)); + } }; Some(Ok(Entry::new(path, metadata))) } @@ -189,6 +208,8 @@ mod tests { /// Invalid lister should not panic nor endless loop. #[tokio::test] async fn test_invalid_lister() -> Result<()> { + let _ = tracing_subscriber::fmt().try_init(); + let mut builder = Azblob::default(); builder
