This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch polish-lister
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit cae17639273d81c9f0b68378bc51a75daf892a0f
Author: Xuanwo <[email protected]>
AuthorDate: Wed Nov 15 16:34:04 2023 +0800

    Save work
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/layers/complete.rs             |   4 +-
 core/src/raw/oio/list/into_flat_page.rs | 264 +++++++++++++++-----------------
 2 files changed, 122 insertions(+), 146 deletions(-)

diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index cc7fdb61a..6a7e665ce 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -249,7 +249,7 @@ impl<A: Accessor> CompleteAccessor<A> {
             (_, false, false) => 
Err(self.new_unsupported_error(Operation::List)),
             // If recursive is true but service can't list_with_recursive
             (true, false, true) => {
-                let p = into_flat_page(self.inner.clone(), path, 
args.limit().unwrap_or(1000));
+                let p = into_flat_page(self.inner.clone(), path);
                 Ok((RpList::default(), CompleteLister::NeedFlat(p)))
             }
             // If recursive is false but service can't list_without_recursive
@@ -291,7 +291,7 @@ impl<A: Accessor> CompleteAccessor<A> {
             (_, false, false) => 
Err(self.new_unsupported_error(Operation::List)),
             // If recursive is true but service can't list_with_recursive
             (true, false, true) => {
-                let p = into_flat_page(self.inner.clone(), path, 
args.limit().unwrap_or(1000));
+                let p = into_flat_page(self.inner.clone(), path);
                 Ok((RpList::default(), CompleteLister::NeedFlat(p)))
             }
             // If recursive is false but service can't list_without_recursive
diff --git a/core/src/raw/oio/list/into_flat_page.rs 
b/core/src/raw/oio/list/into_flat_page.rs
index e7215fa45..d9bfdf113 100644
--- a/core/src/raw/oio/list/into_flat_page.rs
+++ b/core/src/raw/oio/list/into_flat_page.rs
@@ -15,16 +15,17 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::collections::VecDeque;
-use std::task::{Context, Poll};
+use std::task::{ready, Context, Poll};
 
 use async_trait::async_trait;
+use futures::future::BoxFuture;
+use futures::FutureExt;
 
 use crate::raw::*;
 use crate::*;
 
 /// to_flat_lister is used to make a hierarchy lister flat.
-pub fn into_flat_page<A: Accessor, P>(acc: A, path: &str, size: usize) -> 
FlatLister<A, P> {
+pub fn into_flat_page<A: Accessor, P>(acc: A, path: &str) -> FlatLister<A, P> {
     #[cfg(debug_assertions)]
     {
         let meta = acc.info();
@@ -35,12 +36,11 @@ pub fn into_flat_page<A: Accessor, P>(acc: A, path: &str, 
size: usize) -> FlatLi
     }
 
     FlatLister {
-        acc,
-        size,
+        acc: Some(acc),
         root: path.to_string(),
-        dirs: VecDeque::from([oio::Entry::new(path, 
Metadata::new(EntryMode::DIR))]),
-        listers: vec![],
-        res: Vec::with_capacity(size),
+        next_dir: Some(oio::Entry::new(path, Metadata::new(EntryMode::DIR))),
+        active_lister: vec![],
+        list_future: None,
     }
 }
 
@@ -80,74 +80,75 @@ pub fn into_flat_page<A: Accessor, P>(acc: A, path: &str, 
size: usize) -> FlatLi
 /// Especially, for storage services that can't return dirs first, ToFlatLister
 /// may output parent dirs' files before nested dirs, this is expected because 
files
 /// always output directly while listing.
-pub struct FlatLister<A: Accessor, P> {
-    acc: A,
-    size: usize,
+pub struct FlatLister<A: Accessor, L> {
+    acc: Option<A>,
     root: String,
-    dirs: VecDeque<oio::Entry>,
-    listers: Vec<(P, oio::Entry, Vec<oio::Entry>)>,
-    res: Vec<oio::Entry>,
+
+    next_dir: Option<oio::Entry>,
+    active_lister: Vec<(Option<oio::Entry>, L)>,
+    list_future: Option<BoxFuture<'static, (A, oio::Entry, Result<(RpList, 
L)>)>>,
 }
 
+/// # Safety
+///
+/// We will only take `&mut Self` reference for FsLister.
+unsafe impl<A: Accessor, L> Sync for FlatLister<A, L> {}
+
 #[async_trait]
-impl<A, P> oio::List for FlatLister<A, P>
+impl<A, L> oio::List for FlatLister<A, L>
 where
-    A: Accessor<Lister = P>,
-    P: oio::List,
+    A: Accessor<Lister = L>,
+    L: oio::List,
 {
     fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Result<Option<oio::Entry>>> {
-        todo!()
-        // loop {
-        //     if let Some(de) = self.dirs.pop_back() {
-        //         let (_, op) = self.acc.list(de.path(), 
OpList::new()).await?;
-        //         self.listers.push((op, de, vec![]))
-        //     }
-        //
-        //     let (mut lister, de, mut buf) = match self.listers.pop() {
-        //         Some((lister, de, buf)) => (lister, de, buf),
-        //         None => {
-        //             if !self.res.is_empty() {
-        //                 return Ok(Some(mem::take(&mut self.res)));
-        //             }
-        //             return Ok(None);
-        //         }
-        //     };
-        //
-        //     if buf.is_empty() {
-        //         match lister.poll_next(cx)? {
-        //             Some(v) => {
-        //                 buf = v;
-        //             }
-        //             None => {
-        //                 // Only push entry if it's not root dir
-        //                 if de.path() != self.root {
-        //                     self.res.push(de);
-        //                 }
-        //                 continue;
-        //             }
-        //         }
-        //     }
-        //
-        //     let mut buf = VecDeque::from(buf);
-        //     loop {
-        //         if let Some(oe) = buf.pop_front() {
-        //             if oe.mode().is_dir() {
-        //                 self.dirs.push_back(oe);
-        //                 self.listers.push((lister, de, buf.into()));
-        //                 break;
-        //             } else {
-        //                 self.res.push(oe)
-        //             }
-        //         } else {
-        //             self.listers.push((lister, de, vec![]));
-        //             break;
-        //         }
-        //     }
-        //
-        //     if self.res.len() >= self.size {
-        //         return Ok(Some(mem::take(&mut self.res)));
-        //     }
-        // }
+        loop {
+            if let Some(fut) = self.list_future.as_mut() {
+                let (acc, de, res) = ready!(fut.poll_unpin(cx));
+                self.acc = Some(acc);
+                self.list_future = None;
+
+                let (_, l) = res?;
+                self.active_lister.push((Some(de), l))
+            }
+
+            if let Some(de) = self.next_dir.take() {
+                let acc = self.acc.take().expect("Accessor must be valid");
+                let fut = async move {
+                    let res = acc.list(de.path(), OpList::new()).await;
+                    (acc, de, res)
+                };
+                self.list_future = Some(Box::pin(fut));
+                continue;
+            }
+
+            let (de, mut lister) = match self.active_lister.last_mut() {
+                Some((de, lister)) => (de, lister),
+                None => return Poll::Ready(Ok(None)),
+            };
+
+            match ready!(lister.poll_next(cx))? {
+                Some(v) if v.mode().is_dir() => {
+                    self.next_dir = Some(v);
+                    continue;
+                }
+                Some(v) => return Poll::Ready(Ok(Some(v))),
+                None => {
+                    match de.take() {
+                        Some(de) => {
+                            // Only push entry if it's not root dir
+                            if de.path() != self.root {
+                                return Poll::Ready(Ok(Some(de)));
+                            }
+                            continue;
+                        }
+                        None => {
+                            let _ = self.active_lister.pop();
+                            continue;
+                        }
+                    }
+                }
+            }
+        }
     }
 }
 
@@ -157,58 +158,43 @@ where
     P: oio::BlockingList,
 {
     fn next(&mut self) -> Result<Option<oio::Entry>> {
-        todo!()
-        // loop {
-        //     if let Some(de) = self.dirs.pop_back() {
-        //         let (_, op) = self.acc.blocking_list(de.path(), 
OpList::new())?;
-        //         self.listers.push((op, de, vec![]))
-        //     }
-        //
-        //     let (mut lister, de, mut buf) = match self.listers.pop() {
-        //         Some((lister, de, buf)) => (lister, de, buf),
-        //         None => {
-        //             if !self.res.is_empty() {
-        //                 return Ok(Some(mem::take(&mut self.res)));
-        //             }
-        //             return Ok(None);
-        //         }
-        //     };
-        //
-        //     if buf.is_empty() {
-        //         match lister.next()? {
-        //             Some(v) => {
-        //                 buf = v;
-        //             }
-        //             None => {
-        //                 // Only push entry if it's not root dir
-        //                 if de.path() != self.root {
-        //                     self.res.push(de);
-        //                 }
-        //                 continue;
-        //             }
-        //         }
-        //     }
-        //
-        //     let mut buf = VecDeque::from(buf);
-        //     loop {
-        //         if let Some(oe) = buf.pop_front() {
-        //             if oe.mode().is_dir() {
-        //                 self.dirs.push_back(oe);
-        //                 self.listers.push((lister, de, buf.into()));
-        //                 break;
-        //             } else {
-        //                 self.res.push(oe)
-        //             }
-        //         } else {
-        //             self.listers.push((lister, de, vec![]));
-        //             break;
-        //         }
-        //     }
-        //
-        //     if self.res.len() >= self.size {
-        //         return Ok(Some(mem::take(&mut self.res)));
-        //     }
-        // }
+        loop {
+            if let Some(de) = self.next_dir.take() {
+                let acc = self.acc.take().expect("Accessor must be valid");
+                let (_, l) = acc.blocking_list(de.path(), OpList::new())?;
+
+                self.acc = Some(acc);
+                self.active_lister.push((Some(de), l))
+            }
+
+            let (de, mut lister) = match self.active_lister.last_mut() {
+                Some((de, lister)) => (de, lister),
+                None => return Ok(None),
+            };
+
+            match lister.next()? {
+                Some(v) if v.mode().is_dir() => {
+                    self.next_dir = Some(v);
+                    continue;
+                }
+                Some(v) => return Ok(Some(v)),
+                None => {
+                    match de.take() {
+                        Some(de) => {
+                            // Only push entry if it's not root dir
+                            if de.path() != self.root {
+                                return Ok(Some(de));
+                            }
+                            continue;
+                        }
+                        None => {
+                            let _ = self.active_lister.pop();
+                            continue;
+                        }
+                    }
+                }
+            }
+        }
     }
 }
 
@@ -216,6 +202,7 @@ where
 mod tests {
     use std::collections::HashMap;
     use std::vec;
+    use std::vec::IntoIter;
 
     use log::debug;
     use oio::BlockingList;
@@ -240,7 +227,9 @@ mod tests {
         fn get(&self, path: &str) -> MockLister {
             let inner = self.map.get(path).expect("must have value").to_vec();
 
-            MockLister { inner, done: false }
+            MockLister {
+                inner: inner.into_iter(),
+            }
         }
     }
 
@@ -268,31 +257,18 @@ mod tests {
     }
 
     struct MockLister {
-        inner: Vec<&'static str>,
-        done: bool,
+        inner: IntoIter<&'static str>,
     }
 
     impl BlockingList for MockLister {
         fn next(&mut self) -> Result<Option<oio::Entry>> {
-            todo!()
-            // if self.done {
-            //     return Ok(None);
-            // }
-            // self.done = true;
-            //
-            // let entries = self
-            //     .inner
-            //     .iter()
-            //     .map(|path| {
-            //         if path.ends_with('/') {
-            //             oio::Entry::new(path, Metadata::new(EntryMode::DIR))
-            //         } else {
-            //             oio::Entry::new(path, 
Metadata::new(EntryMode::FILE))
-            //         }
-            //     })
-            //     .collect();
-            //
-            // Ok(Some(entries))
+            Ok(self.inner.next().map(|path| {
+                if path.ends_with('/') {
+                    oio::Entry::new(path, Metadata::new(EntryMode::DIR))
+                } else {
+                    oio::Entry::new(path, Metadata::new(EntryMode::FILE))
+                }
+            }))
         }
     }
 
@@ -301,7 +277,7 @@ mod tests {
         let _ = tracing_subscriber::fmt().with_test_writer().try_init();
 
         let acc = MockService::new();
-        let mut lister = into_flat_page(acc, "x/", 10);
+        let mut lister = into_flat_page(acc, "x/");
 
         let mut entries = Vec::default();
 

Reply via email to