This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch polish-lister in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit cae17639273d81c9f0b68378bc51a75daf892a0f Author: Xuanwo <[email protected]> AuthorDate: Wed Nov 15 16:34:04 2023 +0800 Save work Signed-off-by: Xuanwo <[email protected]> --- core/src/layers/complete.rs | 4 +- core/src/raw/oio/list/into_flat_page.rs | 264 +++++++++++++++----------------- 2 files changed, 122 insertions(+), 146 deletions(-) diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index cc7fdb61a..6a7e665ce 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -249,7 +249,7 @@ impl<A: Accessor> CompleteAccessor<A> { (_, false, false) => Err(self.new_unsupported_error(Operation::List)), // If recursive is true but service can't list_with_recursive (true, false, true) => { - let p = into_flat_page(self.inner.clone(), path, args.limit().unwrap_or(1000)); + let p = into_flat_page(self.inner.clone(), path); Ok((RpList::default(), CompleteLister::NeedFlat(p))) } // If recursive is false but service can't list_without_recursive @@ -291,7 +291,7 @@ impl<A: Accessor> CompleteAccessor<A> { (_, false, false) => Err(self.new_unsupported_error(Operation::List)), // If recursive is true but service can't list_with_recursive (true, false, true) => { - let p = into_flat_page(self.inner.clone(), path, args.limit().unwrap_or(1000)); + let p = into_flat_page(self.inner.clone(), path); Ok((RpList::default(), CompleteLister::NeedFlat(p))) } // If recursive is false but service can't list_without_recursive diff --git a/core/src/raw/oio/list/into_flat_page.rs b/core/src/raw/oio/list/into_flat_page.rs index e7215fa45..d9bfdf113 100644 --- a/core/src/raw/oio/list/into_flat_page.rs +++ b/core/src/raw/oio/list/into_flat_page.rs @@ -15,16 +15,17 @@ // specific language governing permissions and limitations // under the License. -use std::collections::VecDeque; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; use async_trait::async_trait; +use futures::future::BoxFuture; +use futures::FutureExt; use crate::raw::*; use crate::*; /// to_flat_lister is used to make a hierarchy lister flat. -pub fn into_flat_page<A: Accessor, P>(acc: A, path: &str, size: usize) -> FlatLister<A, P> { +pub fn into_flat_page<A: Accessor, P>(acc: A, path: &str) -> FlatLister<A, P> { #[cfg(debug_assertions)] { let meta = acc.info(); @@ -35,12 +36,11 @@ pub fn into_flat_page<A: Accessor, P>(acc: A, path: &str, size: usize) -> FlatLi } FlatLister { - acc, - size, + acc: Some(acc), root: path.to_string(), - dirs: VecDeque::from([oio::Entry::new(path, Metadata::new(EntryMode::DIR))]), - listers: vec![], - res: Vec::with_capacity(size), + next_dir: Some(oio::Entry::new(path, Metadata::new(EntryMode::DIR))), + active_lister: vec![], + list_future: None, } } @@ -80,74 +80,75 @@ pub fn into_flat_page<A: Accessor, P>(acc: A, path: &str, size: usize) -> FlatLi /// Especially, for storage services that can't return dirs first, ToFlatLister /// may output parent dirs' files before nested dirs, this is expected because files /// always output directly while listing. -pub struct FlatLister<A: Accessor, P> { - acc: A, - size: usize, +pub struct FlatLister<A: Accessor, L> { + acc: Option<A>, root: String, - dirs: VecDeque<oio::Entry>, - listers: Vec<(P, oio::Entry, Vec<oio::Entry>)>, - res: Vec<oio::Entry>, + + next_dir: Option<oio::Entry>, + active_lister: Vec<(Option<oio::Entry>, L)>, + list_future: Option<BoxFuture<'static, (A, oio::Entry, Result<(RpList, L)>)>>, } +/// # Safety +/// +/// We will only take `&mut Self` reference for FsLister. +unsafe impl<A: Accessor, L> Sync for FlatLister<A, L> {} + #[async_trait] -impl<A, P> oio::List for FlatLister<A, P> +impl<A, L> oio::List for FlatLister<A, L> where - A: Accessor<Lister = P>, - P: oio::List, + A: Accessor<Lister = L>, + L: oio::List, { fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { - todo!() - // loop { - // if let Some(de) = self.dirs.pop_back() { - // let (_, op) = self.acc.list(de.path(), OpList::new()).await?; - // self.listers.push((op, de, vec![])) - // } - // - // let (mut lister, de, mut buf) = match self.listers.pop() { - // Some((lister, de, buf)) => (lister, de, buf), - // None => { - // if !self.res.is_empty() { - // return Ok(Some(mem::take(&mut self.res))); - // } - // return Ok(None); - // } - // }; - // - // if buf.is_empty() { - // match lister.poll_next(cx)? { - // Some(v) => { - // buf = v; - // } - // None => { - // // Only push entry if it's not root dir - // if de.path() != self.root { - // self.res.push(de); - // } - // continue; - // } - // } - // } - // - // let mut buf = VecDeque::from(buf); - // loop { - // if let Some(oe) = buf.pop_front() { - // if oe.mode().is_dir() { - // self.dirs.push_back(oe); - // self.listers.push((lister, de, buf.into())); - // break; - // } else { - // self.res.push(oe) - // } - // } else { - // self.listers.push((lister, de, vec![])); - // break; - // } - // } - // - // if self.res.len() >= self.size { - // return Ok(Some(mem::take(&mut self.res))); - // } - // } + loop { + if let Some(fut) = self.list_future.as_mut() { + let (acc, de, res) = ready!(fut.poll_unpin(cx)); + self.acc = Some(acc); + self.list_future = None; + + let (_, l) = res?; + self.active_lister.push((Some(de), l)) + } + + if let Some(de) = self.next_dir.take() { + let acc = self.acc.take().expect("Accessor must be valid"); + let fut = async move { + let res = acc.list(de.path(), OpList::new()).await; + (acc, de, res) + }; + self.list_future = Some(Box::pin(fut)); + continue; + } + + let (de, mut lister) = match self.active_lister.last_mut() { + Some((de, lister)) => (de, lister), + None => return Poll::Ready(Ok(None)), + }; + + match ready!(lister.poll_next(cx))? { + Some(v) if v.mode().is_dir() => { + self.next_dir = Some(v); + continue; + } + Some(v) => return Poll::Ready(Ok(Some(v))), + None => { + match de.take() { + Some(de) => { + // Only push entry if it's not root dir + if de.path() != self.root { + return Poll::Ready(Ok(Some(de))); + } + continue; + } + None => { + let _ = self.active_lister.pop(); + continue; + } + } + } + } + } } } @@ -157,58 +158,43 @@ where P: oio::BlockingList, { fn next(&mut self) -> Result<Option<oio::Entry>> { - todo!() - // loop { - // if let Some(de) = self.dirs.pop_back() { - // let (_, op) = self.acc.blocking_list(de.path(), OpList::new())?; - // self.listers.push((op, de, vec![])) - // } - // - // let (mut lister, de, mut buf) = match self.listers.pop() { - // Some((lister, de, buf)) => (lister, de, buf), - // None => { - // if !self.res.is_empty() { - // return Ok(Some(mem::take(&mut self.res))); - // } - // return Ok(None); - // } - // }; - // - // if buf.is_empty() { - // match lister.next()? { - // Some(v) => { - // buf = v; - // } - // None => { - // // Only push entry if it's not root dir - // if de.path() != self.root { - // self.res.push(de); - // } - // continue; - // } - // } - // } - // - // let mut buf = VecDeque::from(buf); - // loop { - // if let Some(oe) = buf.pop_front() { - // if oe.mode().is_dir() { - // self.dirs.push_back(oe); - // self.listers.push((lister, de, buf.into())); - // break; - // } else { - // self.res.push(oe) - // } - // } else { - // self.listers.push((lister, de, vec![])); - // break; - // } - // } - // - // if self.res.len() >= self.size { - // return Ok(Some(mem::take(&mut self.res))); - // } - // } + loop { + if let Some(de) = self.next_dir.take() { + let acc = self.acc.take().expect("Accessor must be valid"); + let (_, l) = acc.blocking_list(de.path(), OpList::new())?; + + self.acc = Some(acc); + self.active_lister.push((Some(de), l)) + } + + let (de, mut lister) = match self.active_lister.last_mut() { + Some((de, lister)) => (de, lister), + None => return Ok(None), + }; + + match lister.next()? { + Some(v) if v.mode().is_dir() => { + self.next_dir = Some(v); + continue; + } + Some(v) => return Ok(Some(v)), + None => { + match de.take() { + Some(de) => { + // Only push entry if it's not root dir + if de.path() != self.root { + return Ok(Some(de)); + } + continue; + } + None => { + let _ = self.active_lister.pop(); + continue; + } + } + } + } + } } } @@ -216,6 +202,7 @@ where mod tests { use std::collections::HashMap; use std::vec; + use std::vec::IntoIter; use log::debug; use oio::BlockingList; @@ -240,7 +227,9 @@ mod tests { fn get(&self, path: &str) -> MockLister { let inner = self.map.get(path).expect("must have value").to_vec(); - MockLister { inner, done: false } + MockLister { + inner: inner.into_iter(), + } } } @@ -268,31 +257,18 @@ mod tests { } struct MockLister { - inner: Vec<&'static str>, - done: bool, + inner: IntoIter<&'static str>, } impl BlockingList for MockLister { fn next(&mut self) -> Result<Option<oio::Entry>> { - todo!() - // if self.done { - // return Ok(None); - // } - // self.done = true; - // - // let entries = self - // .inner - // .iter() - // .map(|path| { - // if path.ends_with('/') { - // oio::Entry::new(path, Metadata::new(EntryMode::DIR)) - // } else { - // oio::Entry::new(path, Metadata::new(EntryMode::FILE)) - // } - // }) - // .collect(); - // - // Ok(Some(entries)) + Ok(self.inner.next().map(|path| { + if path.ends_with('/') { + oio::Entry::new(path, Metadata::new(EntryMode::DIR)) + } else { + oio::Entry::new(path, Metadata::new(EntryMode::FILE)) + } + })) } } @@ -301,7 +277,7 @@ mod tests { let _ = tracing_subscriber::fmt().with_test_writer().try_init(); let acc = MockService::new(); - let mut lister = into_flat_page(acc, "x/", 10); + let mut lister = into_flat_page(acc, "x/"); let mut entries = Vec::default();
