This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch polish-lister in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit c6123160467564fbe84ec46de3cb8751f11553c2 Author: Xuanwo <[email protected]> AuthorDate: Wed Nov 15 15:28:13 2023 +0800 Save work Signed-off-by: Xuanwo <[email protected]> --- core/src/layers/blocking.rs | 3 +- core/src/layers/complete.rs | 11 +-- core/src/layers/concurrent_limit.rs | 4 +- core/src/layers/error_context.rs | 4 +- core/src/layers/immutable_index.rs | 6 +- core/src/layers/logging.rs | 12 +-- core/src/layers/madsim.rs | 8 +- core/src/layers/minitrace.rs | 12 +-- core/src/layers/oteltrace.rs | 4 +- core/src/layers/retry.rs | 76 +++++++++------ core/src/layers/timeout.rs | 33 +++++-- core/src/layers/tracing.rs | 4 +- core/src/raw/adapters/kv/backend.rs | 54 ++++++----- core/src/raw/adapters/typed_kv/backend.rs | 53 +++++----- core/src/raw/oio/list/api.rs | 19 ++-- core/src/raw/oio/list/into_flat_page.rs | 106 ++++++++++---------- core/src/raw/oio/list/into_hierarchy_pager.rs | 26 ++--- core/src/raw/oio/list/mod.rs | 5 + core/src/raw/oio/list/page_list.rs | 121 +++++++++++++++++++++++ core/src/services/alluxio/backend.rs | 8 +- core/src/services/alluxio/lister.rs | 28 ++---- core/src/services/azblob/backend.rs | 6 +- core/src/services/azblob/lister.rs | 27 ++---- core/src/services/azdls/backend.rs | 6 +- core/src/services/azdls/lister.rs | 44 +++------ core/src/services/azfile/backend.rs | 6 +- core/src/services/azfile/lister.rs | 49 +++------- core/src/services/cos/backend.rs | 8 +- core/src/services/cos/lister.rs | 31 ++---- core/src/services/dbfs/backend.rs | 6 +- core/src/services/dbfs/lister.rs | 28 ++---- core/src/services/fs/lister.rs | 61 ++++++++---- core/src/services/ftp/lister.rs | 49 +++++----- core/src/services/gcs/backend.rs | 21 ++-- core/src/services/gcs/lister.rs | 34 +++---- core/src/services/gdrive/backend.rs | 8 +- core/src/services/gdrive/core.rs | 25 ++--- core/src/services/gdrive/lister.rs | 72 +++++--------- core/src/services/hdfs/lister.rs | 49 +++++----- core/src/services/ipfs/backend.rs | 26 ++--- core/src/services/ipmfs/backend.rs | 8 +- core/src/services/ipmfs/lister.rs | 53 ++++------ core/src/services/obs/backend.rs | 8 +- core/src/services/obs/lister.rs | 27 ++---- core/src/services/onedrive/backend.rs | 7 +- core/src/services/onedrive/lister.rs | 135 +++++++++++--------------- core/src/services/oss/backend.rs | 20 ++-- core/src/services/oss/core.rs | 14 +-- core/src/services/oss/lister.rs | 34 +++---- core/src/services/s3/backend.rs | 20 ++-- core/src/services/s3/lister.rs | 35 +++---- core/src/services/sftp/lister.rs | 19 ++-- core/src/services/swift/backend.rs | 6 +- core/src/services/swift/lister.rs | 22 ++--- core/src/services/webdav/backend.rs | 9 +- core/src/services/webdav/lister.rs | 32 +++--- core/src/services/webhdfs/backend.rs | 75 +++----------- core/src/services/webhdfs/lister.rs | 110 +++++++++------------ core/src/types/list.rs | 72 ++++++-------- 59 files changed, 872 insertions(+), 987 deletions(-) diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs index 67281ba25..3181062e7 100644 --- a/core/src/layers/blocking.rs +++ b/core/src/layers/blocking.rs @@ -315,7 +315,8 @@ impl<I: oio::Write + 'static> oio::BlockingWrite for BlockingWrapper<I> { impl<I: oio::List> oio::BlockingList for BlockingWrapper<I> { fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { - self.handle.block_on(self.inner.next()) + todo!() + // self.handle.block_on(poll_fn(|cx| self.inner.poll_next(cx))) } } diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index 7fd2037dc..ea4906cf6 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -27,7 +27,6 @@ use std::task::Poll; use async_trait::async_trait; use bytes::Bytes; -use crate::raw::oio::Entry; use crate::raw::oio::FlatLister; use crate::raw::oio::HierarchyLister; use crate::raw::oio::RangeReader; @@ -639,13 +638,13 @@ where A: Accessor<Lister = P>, P: oio::List, { - async fn next(&mut self) -> Result<Option<Vec<Entry>>> { + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { use CompleteLister::*; match self { - AlreadyComplete(p) => p.next().await, - NeedFlat(p) => p.next().await, - NeedHierarchy(p) => p.next().await, + AlreadyComplete(p) => p.poll_next(cx), + NeedFlat(p) => p.poll_next(cx), + NeedHierarchy(p) => p.poll_next(cx), } } } @@ -655,7 +654,7 @@ where A: Accessor<BlockingLister = P>, P: oio::BlockingList, { - fn next(&mut self) -> Result<Option<Vec<Entry>>> { + fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { use CompleteLister::*; match self { diff --git a/core/src/layers/concurrent_limit.rs b/core/src/layers/concurrent_limit.rs index 00f9a9a8c..d839b97d9 100644 --- a/core/src/layers/concurrent_limit.rs +++ b/core/src/layers/concurrent_limit.rs @@ -310,8 +310,8 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for ConcurrentLimitWrapper<R> { #[async_trait] impl<R: oio::List> oio::List for ConcurrentLimitWrapper<R> { - async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { - self.inner.next().await + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { + self.inner.poll_next(cx) } } diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs index 3278d502c..c3a0feef8 100644 --- a/core/src/layers/error_context.rs +++ b/core/src/layers/error_context.rs @@ -453,8 +453,8 @@ impl<T: oio::BlockingWrite> oio::BlockingWrite for ErrorContextWrapper<T> { #[async_trait::async_trait] impl<T: oio::List> oio::List for ErrorContextWrapper<T> { - async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { - self.inner.next().await.map_err(|err| { + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { + self.inner.poll_next(cx).map_err(|err| { err.with_operation(ListOperation::Next) .with_context("service", self.scheme) .with_context("path", &self.path) diff --git a/core/src/layers/immutable_index.rs b/core/src/layers/immutable_index.rs index 6137687b8..9d47d78ff 100644 --- a/core/src/layers/immutable_index.rs +++ b/core/src/layers/immutable_index.rs @@ -18,6 +18,7 @@ use std::collections::HashSet; use std::fmt::Debug; use std::mem; +use std::task::{Context, Poll}; use async_trait::async_trait; @@ -240,8 +241,9 @@ impl ImmutableDir { #[async_trait] impl oio::List for ImmutableDir { - async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { - Ok(self.inner_next_page()) + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { + todo!() + // Ok(self.inner_next_page()) } } diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs index b556cc7aa..16b91cef5 100644 --- a/core/src/layers/logging.rs +++ b/core/src/layers/logging.rs @@ -1476,18 +1476,18 @@ impl<P> Drop for LoggingLister<P> { #[async_trait] impl<P: oio::List> oio::List for LoggingLister<P> { - async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { - let res = self.inner.next().await; + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { + let res = ready!(self.inner.poll_next(cx)); match &res { - Ok(Some(des)) => { + Ok(Some(de)) => { debug!( target: LOGGING_TARGET, - "service={} operation={} path={} -> listed {} entries", + "service={} operation={} path={} -> listed entry: {}", self.ctx.scheme, self.op, self.path, - des.len(), + de.path(), ); } Ok(None) => { @@ -1515,7 +1515,7 @@ impl<P: oio::List> oio::List for LoggingLister<P> { } }; - res + Poll::Ready(res) } } diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs index 9eb75da2d..1b5b602aa 100644 --- a/core/src/layers/madsim.rs +++ b/core/src/layers/madsim.rs @@ -39,8 +39,6 @@ use madsim::net::Endpoint; #[cfg(madsim)] use madsim::net::Payload; -use crate::raw::oio; -use crate::raw::oio::Entry; use crate::raw::*; use crate::*; @@ -335,11 +333,11 @@ pub struct MadsimLister {} #[async_trait] impl oio::List for MadsimLister { - async fn next(&mut self) -> crate::Result<Option<Vec<Entry>>> { - Err(Error::new( + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<Option<oio::Entry>>> { + Poll::Ready(Err(Error::new( ErrorKind::Unsupported, "will be supported in the future", - )) + ))) } } diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs index 26df33004..b0f192ed3 100644 --- a/core/src/layers/minitrace.rs +++ b/core/src/layers/minitrace.rs @@ -372,14 +372,10 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for MinitraceWrapper<R> { #[async_trait] impl<R: oio::List> oio::List for MinitraceWrapper<R> { - async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { - self.inner - .next() - .in_span(Span::enter_with_parent( - ListOperation::Next.into_static(), - &self.span, - )) - .await + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { + let _g = self.span.set_local_parent(); + let _span = LocalSpan::enter_with_local_parent(ListOperation::Next.into_static()); + self.inner.poll_next(cx) } } diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs index 0244e3190..c4d8b6f0e 100644 --- a/core/src/layers/oteltrace.rs +++ b/core/src/layers/oteltrace.rs @@ -331,8 +331,8 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for OtelTraceWrapper<R> { #[async_trait] impl<R: oio::List> oio::List for OtelTraceWrapper<R> { - async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { - self.inner.next().await + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { + self.inner.poll_next(cx) } } diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs index d0162ccdf..67528347d 100644 --- a/core/src/layers/retry.rs +++ b/core/src/layers/retry.rs @@ -1051,28 +1051,48 @@ impl<R: oio::BlockingWrite, I: RetryInterceptor> oio::BlockingWrite for RetryWra #[async_trait] impl<P: oio::List, I: RetryInterceptor> oio::List for RetryWrapper<P, I> { - async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { - let mut backoff = self.builder.build(); - - loop { - match self.inner.next().await { - Ok(v) => return Ok(v), - Err(e) if !e.is_temporary() => return Err(e), - Err(e) => match backoff.next() { - None => return Err(e), + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { + if let Some(sleep) = self.sleep.as_mut() { + ready!(sleep.poll_unpin(cx)); + self.sleep = None; + } + + match ready!(self.inner.poll_next(cx)) { + Ok(v) => { + self.current_backoff = None; + Poll::Ready(Ok(v)) + } + Err(err) if !err.is_temporary() => { + self.current_backoff = None; + Poll::Ready(Err(err)) + } + Err(err) => { + let backoff = match self.current_backoff.as_mut() { + Some(backoff) => backoff, + None => { + self.current_backoff = Some(self.builder.build()); + self.current_backoff.as_mut().unwrap() + } + }; + + match backoff.next() { + None => { + self.current_backoff = None; + Poll::Ready(Err(err)) + } Some(dur) => { self.notify.intercept( - &e, + &err, dur, &[ ("operation", ListOperation::Next.into_static()), ("path", &self.path), ], ); - tokio::time::sleep(dur).await; - continue; + self.sleep = Some(Box::pin(tokio::time::sleep(dur))); + self.poll_next(cx) } - }, + } } } } @@ -1296,37 +1316,33 @@ mod tests { } #[async_trait] impl oio::List for MockLister { - async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { self.attempt += 1; - match self.attempt { + let result = match self.attempt { 1 => Err(Error::new( ErrorKind::RateLimited, "retryable rate limited error from lister", ) .set_temporary()), - 2 => { - let entries = vec![ - oio::Entry::new("hello", Metadata::new(EntryMode::FILE)), - oio::Entry::new("world", Metadata::new(EntryMode::FILE)), - ]; - Ok(Some(entries)) - } + 2 => Ok(Some(oio::Entry::new( + "hello", + Metadata::new(EntryMode::FILE), + ))), 3 => Err( Error::new(ErrorKind::Unexpected, "retryable internal server error") .set_temporary(), ), - 4 => { - let entries = vec![ - oio::Entry::new("2023/", Metadata::new(EntryMode::DIR)), - oio::Entry::new("0208/", Metadata::new(EntryMode::DIR)), - ]; - Ok(Some(entries)) - } + 4 => Ok(Some(oio::Entry::new( + "2023/", + Metadata::new(EntryMode::DIR), + ))), 5 => Ok(None), _ => { unreachable!() } - } + }; + + Poll::Ready(result) } } diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index 64b55ff18..8f04c3363 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -417,14 +417,33 @@ impl<R: oio::Write> oio::Write for TimeoutWrapper<R> { #[async_trait] impl<R: oio::List> oio::List for TimeoutWrapper<R> { - async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { - tokio::time::timeout(self.timeout, self.inner.next()) - .await - .map_err(|_| { - Error::new(ErrorKind::Unexpected, "operation timeout") + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { + match self.start { + Some(start) => { + if start.elapsed() > self.timeout { + // Clean up the start time before return ready. + self.start = None; + + return Poll::Ready(Err(Error::new( + ErrorKind::Unexpected, + "operation timeout", + ) .with_operation(ListOperation::Next) .with_context("timeout", self.timeout.as_secs_f64().to_string()) - .set_temporary() - })? + .set_temporary())); + } + } + None => { + self.start = Some(Instant::now()); + } + } + + match self.inner.poll_next(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(v) => { + self.start = None; + Poll::Ready(v) + } + } } } diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs index 418be8235..38ae5f2ad 100644 --- a/core/src/layers/tracing.rs +++ b/core/src/layers/tracing.rs @@ -366,8 +366,8 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for TracingWrapper<R> { #[async_trait] impl<R: oio::List> oio::List for TracingWrapper<R> { #[tracing::instrument(parent = &self.span, level = "debug", skip_all)] - async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { - self.inner.next().await + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { + self.inner.poll_next(cx) } } diff --git a/core/src/raw/adapters/kv/backend.rs b/core/src/raw/adapters/kv/backend.rs index 4360e6f39..b5ddf9440 100644 --- a/core/src/raw/adapters/kv/backend.rs +++ b/core/src/raw/adapters/kv/backend.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use std::task::ready; use std::task::Context; use std::task::Poll; +use std::vec::IntoIter; use async_trait::async_trait; use bytes::Bytes; @@ -303,47 +304,52 @@ where pub struct KvLister { root: String, - inner: Option<Vec<String>>, + inner: IntoIter<String>, } impl KvLister { fn new(root: &str, inner: Vec<String>) -> Self { Self { root: root.to_string(), - inner: Some(inner), + inner: inner.into_iter(), } } - - fn inner_next_page(&mut self) -> Option<Vec<oio::Entry>> { - let res = self - .inner - .take()? - .into_iter() - .map(|v| { - let mode = if v.ends_with('/') { - EntryMode::DIR - } else { - EntryMode::FILE - }; - - oio::Entry::new(&build_rel_path(&self.root, &v), Metadata::new(mode)) - }) - .collect(); - - Some(res) - } } #[async_trait] impl oio::List for KvLister { - async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { - Ok(self.inner_next_page()) + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { + let entry = self.inner.next().map(|v| { + let mode = if v.ends_with('/') { + EntryMode::DIR + } else { + EntryMode::FILE + }; + + oio::Entry::new(&build_rel_path(&self.root, &v), Metadata::new(mode)) + }); + + Poll::Ready(Ok(entry)) } } impl oio::BlockingList for KvLister { fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { - Ok(self.inner_next_page()) + Ok(Some( + self.inner + .as_slice() + .iter() + .map(|v| { + let mode = if v.ends_with('/') { + EntryMode::DIR + } else { + EntryMode::FILE + }; + + oio::Entry::new(&build_rel_path(&self.root, &v), Metadata::new(mode)) + }) + .collect(), + )) } } diff --git a/core/src/raw/adapters/typed_kv/backend.rs b/core/src/raw/adapters/typed_kv/backend.rs index 2a1da5966..e60e21649 100644 --- a/core/src/raw/adapters/typed_kv/backend.rs +++ b/core/src/raw/adapters/typed_kv/backend.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use std::task::ready; use std::task::Context; use std::task::Poll; +use std::vec::IntoIter; use async_trait::async_trait; use bytes::Bytes; @@ -304,47 +305,51 @@ where pub struct KvLister { root: String, - inner: Option<Vec<String>>, + inner: IntoIter<String>, } impl KvLister { fn new(root: &str, inner: Vec<String>) -> Self { Self { root: root.to_string(), - inner: Some(inner), + inner: inner.into_iter(), } } - - fn inner_next_page(&mut self) -> Option<Vec<oio::Entry>> { - let res = self - .inner - .take()? - .into_iter() - .map(|v| { - let mode = if v.ends_with('/') { - EntryMode::DIR - } else { - EntryMode::FILE - }; - - oio::Entry::new(&build_rel_path(&self.root, &v), Metadata::new(mode)) - }) - .collect(); - - Some(res) - } } #[async_trait] impl oio::List for KvLister { - async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { - Ok(self.inner_next_page()) + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { + let entry = self.inner.next().map(|v| { + let mode = if v.ends_with('/') { + EntryMode::DIR + } else { + EntryMode::FILE + }; + + oio::Entry::new(&build_rel_path(&self.root, &v), Metadata::new(mode)) + }); + + Poll::Ready(Ok(entry)) } } impl oio::BlockingList for KvLister { fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { - Ok(self.inner_next_page()) + Ok(Some( + self.inner + .clone() + .map(|v| { + let mode = if v.ends_with('/') { + EntryMode::DIR + } else { + EntryMode::FILE + }; + + oio::Entry::new(&build_rel_path(&self.root, &v), Metadata::new(mode)) + }) + .collect(), + )) } } diff --git a/core/src/raw/oio/list/api.rs b/core/src/raw/oio/list/api.rs index c08f5d294..b7899e60d 100644 --- a/core/src/raw/oio/list/api.rs +++ b/core/src/raw/oio/list/api.rs @@ -17,6 +17,7 @@ use std::fmt::Display; use std::fmt::Formatter; +use std::task::{Context, Poll}; use async_trait::async_trait; @@ -27,7 +28,7 @@ use crate::*; #[derive(Debug, Copy, Clone, Hash, Eq, PartialEq)] #[non_exhaustive] pub enum ListOperation { - /// Operation for [`List::next`] + /// Operation for [`List::poll_next`] Next, /// Operation for [`BlockingList::next`] BlockingNext, @@ -64,7 +65,7 @@ pub trait List: Send + Sync + 'static { /// /// `Ok(None)` means all pages have been returned. Any following call /// to `next` will always get the same result. - async fn next(&mut self) -> Result<Option<Vec<Entry>>>; + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<Entry>>>; } /// The boxed version of [`List`] @@ -72,24 +73,24 @@ pub type Lister = Box<dyn List>; #[async_trait] impl<P: List + ?Sized> List for Box<P> { - async fn next(&mut self) -> Result<Option<Vec<Entry>>> { - (**self).next().await + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<Entry>>> { + (**self).poll_next(cx) } } #[async_trait] impl List for () { - async fn next(&mut self) -> Result<Option<Vec<Entry>>> { - Ok(None) + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<Entry>>> { + Poll::Ready(Ok(None)) } } #[async_trait] impl<P: List> List for Option<P> { - async fn next(&mut self) -> Result<Option<Vec<Entry>>> { + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<Entry>>> { match self { - Some(p) => p.next().await, - None => Ok(None), + Some(p) => p.poll_next(cx), + None => Poll::Ready(Ok(None)), } } } diff --git a/core/src/raw/oio/list/into_flat_page.rs b/core/src/raw/oio/list/into_flat_page.rs index ee9966b4c..6c2ef6207 100644 --- a/core/src/raw/oio/list/into_flat_page.rs +++ b/core/src/raw/oio/list/into_flat_page.rs @@ -17,6 +17,7 @@ use std::collections::VecDeque; use std::mem; +use std::task::{Context, Poll}; use async_trait::async_trait; @@ -95,58 +96,59 @@ where A: Accessor<Lister = P>, P: oio::List, { - async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { - loop { - if let Some(de) = self.dirs.pop_back() { - let (_, op) = self.acc.list(de.path(), OpList::new()).await?; - self.listers.push((op, de, vec![])) - } - - let (mut lister, de, mut buf) = match self.listers.pop() { - Some((lister, de, buf)) => (lister, de, buf), - None => { - if !self.res.is_empty() { - return Ok(Some(mem::take(&mut self.res))); - } - return Ok(None); - } - }; - - if buf.is_empty() { - match lister.next().await? { - Some(v) => { - buf = v; - } - None => { - // Only push entry if it's not root dir - if de.path() != self.root { - self.res.push(de); - } - continue; - } - } - } - - let mut buf = VecDeque::from(buf); - loop { - if let Some(oe) = buf.pop_front() { - if oe.mode().is_dir() { - self.dirs.push_back(oe); - self.listers.push((lister, de, buf.into())); - break; - } else { - self.res.push(oe) - } - } else { - self.listers.push((lister, de, vec![])); - break; - } - } - - if self.res.len() >= self.size { - return Ok(Some(mem::take(&mut self.res))); - } - } + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { + todo!() + // loop { + // if let Some(de) = self.dirs.pop_back() { + // let (_, op) = self.acc.list(de.path(), OpList::new()).await?; + // self.listers.push((op, de, vec![])) + // } + // + // let (mut lister, de, mut buf) = match self.listers.pop() { + // Some((lister, de, buf)) => (lister, de, buf), + // None => { + // if !self.res.is_empty() { + // return Ok(Some(mem::take(&mut self.res))); + // } + // return Ok(None); + // } + // }; + // + // if buf.is_empty() { + // match lister.poll_next(cx)? { + // Some(v) => { + // buf = v; + // } + // None => { + // // Only push entry if it's not root dir + // if de.path() != self.root { + // self.res.push(de); + // } + // continue; + // } + // } + // } + // + // let mut buf = VecDeque::from(buf); + // loop { + // if let Some(oe) = buf.pop_front() { + // if oe.mode().is_dir() { + // self.dirs.push_back(oe); + // self.listers.push((lister, de, buf.into())); + // break; + // } else { + // self.res.push(oe) + // } + // } else { + // self.listers.push((lister, de, vec![])); + // break; + // } + // } + // + // if self.res.len() >= self.size { + // return Ok(Some(mem::take(&mut self.res))); + // } + // } } } diff --git a/core/src/raw/oio/list/into_hierarchy_pager.rs b/core/src/raw/oio/list/into_hierarchy_pager.rs index 2971ad2a1..217371b45 100644 --- a/core/src/raw/oio/list/into_hierarchy_pager.rs +++ b/core/src/raw/oio/list/into_hierarchy_pager.rs @@ -16,6 +16,7 @@ // under the License. use std::collections::HashSet; +use std::task::{Context, Poll}; use async_trait::async_trait; @@ -118,18 +119,19 @@ impl<P> HierarchyLister<P> { #[async_trait] impl<P: oio::List> oio::List for HierarchyLister<P> { - async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { - let page = self.lister.next().await?; - - let entries = if let Some(entries) = page { - entries - } else { - return Ok(None); - }; - - let entries = self.filter_entries(entries); - - Ok(Some(entries)) + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { + todo!() + // let page = self.lister.poll_next(cx)?; + // + // let entries = if let Some(entries) = page { + // entries + // } else { + // return Ok(None); + // }; + // + // let entries = self.filter_entries(entries); + // + // Ok(Some(entries)) } } diff --git a/core/src/raw/oio/list/mod.rs b/core/src/raw/oio/list/mod.rs index 0a4b0e39e..63fe41292 100644 --- a/core/src/raw/oio/list/mod.rs +++ b/core/src/raw/oio/list/mod.rs @@ -22,6 +22,11 @@ pub use api::List; pub use api::ListOperation; pub use api::Lister; +mod page_list; +pub use page_list::PageContext; +pub use page_list::PageList; +pub use page_list::PageLister; + mod into_flat_page; pub use into_flat_page::into_flat_page; pub use into_flat_page::FlatLister; diff --git a/core/src/raw/oio/list/page_list.rs b/core/src/raw/oio/list/page_list.rs new file mode 100644 index 000000000..ed14a57d8 --- /dev/null +++ b/core/src/raw/oio/list/page_list.rs @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use async_trait::async_trait; +use futures::future::BoxFuture; +use std::collections::VecDeque; +use std::task::{ready, Context, Poll}; + +use crate::raw::*; +use crate::*; + +/// PageList is used to implement [`List`] based on API supporting pagination. By implementing +/// PageList, services don't need to care about the details of page list. +/// +/// # Architecture +/// +/// The architecture after adopting [`PageList`]: +/// +/// - Services impl `PageList` +/// - `PageLister` impl `List` +/// - Expose `PageLister` as `Accessor::Lister` +#[async_trait] +pub trait PageList: Send + Sync + Unpin + 'static { + /// next_page is used to fetch next page of entries from underlying storage. + async fn next_page(&self, ctx: &mut PageContext) -> Result<()>; +} + +pub struct PageContext { + /// done is used to indicate whether the list operation is done. + pub done: bool, + /// token is used by underlying storage services to fetch next page. + pub token: String, + /// entries is used to store entries fetched from underlying storage. + /// + /// Please always reuse the same `VecDeque` to avoid unnecessary memory allocation. + /// PageLister makes sure that entries is reset before calling `next_page`. Implementor + /// can calling `push_back` on `entries` directly. + pub entries: VecDeque<oio::Entry>, +} + +pub struct PageLister<L: PageList> { + state: State<L>, +} + +enum State<L> { + Idle(Option<(L, PageContext)>), + Fetch(BoxFuture<'static, ((L, PageContext), Result<()>)>), +} + +/// # Safety +/// +/// We will only take `&mut Self` reference for State. +unsafe impl<L: PageList> Sync for State<L> {} + +impl<L> PageLister<L> +where + L: PageList, +{ + /// Create a new PageLister. + pub fn new(l: L) -> Self { + Self { + state: State::Idle(Some(( + l, + PageContext { + done: false, + token: "".to_string(), + entries: VecDeque::new(), + }, + ))), + } + } +} + +impl<L> oio::List for PageLister<L> +where + L: PageList, +{ + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { + loop { + match &mut self.state { + State::Idle(st) => { + if let Some((_, ctx)) = st.as_mut() { + if let Some(entry) = ctx.entries.pop_front() { + return Poll::Ready(Ok(Some(entry))); + } + if ctx.done { + return Poll::Ready(Ok(None)); + } + } + + let (l, mut ctx) = st.take().expect("lister must be valid"); + let fut = async move { + let res = l.next_page(&mut ctx).await; + ((l, ctx), res) + }; + self.state = State::Fetch(Box::pin(fut)); + } + State::Fetch(fut) => { + let ((l, ctx), res) = ready!(fut.as_mut().poll(cx)); + self.state = State::Idle(Some((l, ctx))); + + res?; + } + } + } + } +} diff --git a/core/src/services/alluxio/backend.rs b/core/src/services/alluxio/backend.rs index d5c9b3a12..64db4461f 100644 --- a/core/src/services/alluxio/backend.rs +++ b/core/src/services/alluxio/backend.rs @@ -185,7 +185,7 @@ impl Accessor for AlluxioBackend { type BlockingReader = (); type Writer = AlluxioWriters; type BlockingWriter = (); - type Lister = AlluxioLister; + type Lister = oio::PageLister<AlluxioLister>; type BlockingLister = (); fn info(&self) -> AccessorInfo { @@ -253,10 +253,8 @@ impl Accessor for AlluxioBackend { } async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> { - Ok(( - RpList::default(), - AlluxioLister::new(self.core.clone(), path), - )) + let l = AlluxioLister::new(self.core.clone(), path); + Ok((RpList::default(), oio::PageLister::new(l))) } } diff --git a/core/src/services/alluxio/lister.rs b/core/src/services/alluxio/lister.rs index c1efa6b3a..5e56dbff8 100644 --- a/core/src/services/alluxio/lister.rs +++ b/core/src/services/alluxio/lister.rs @@ -29,8 +29,6 @@ pub struct AlluxioLister { core: Arc<AlluxioCore>, path: String, - - done: bool, } impl AlluxioLister { @@ -38,23 +36,19 @@ impl AlluxioLister { AlluxioLister { core, path: path.to_string(), - done: false, } } } #[async_trait] -impl oio::List for AlluxioLister { - async fn next(&mut self) -> Result<Option<Vec<Entry>>> { - if self.done { - return Ok(None); - } - +impl oio::PageList for AlluxioLister { + async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let result = self.core.list_status(&self.path).await; match result { Ok(file_infos) => { - let mut entries = vec![]; + ctx.done = true; + for file_info in file_infos { let path: String = file_info.path.clone(); let path = if file_info.folder { @@ -62,24 +56,18 @@ impl oio::List for AlluxioLister { } else { path }; - entries.push(Entry::new( + ctx.entries.push_back(Entry::new( &build_rel_path(&self.core.root, &path), file_info.try_into()?, )); } - if entries.is_empty() { - return Ok(None); - } - - self.done = true; - - Ok(Some(entries)) + Ok(()) } Err(e) => { if e.kind() == ErrorKind::NotFound { - self.done = true; - return Ok(None); + ctx.done = true; + return Ok(()); } Err(e) } diff --git a/core/src/services/azblob/backend.rs b/core/src/services/azblob/backend.rs index a87907477..b3e8caad9 100644 --- a/core/src/services/azblob/backend.rs +++ b/core/src/services/azblob/backend.rs @@ -546,7 +546,7 @@ impl Accessor for AzblobBackend { type BlockingReader = (); type Writer = AzblobWriters; type BlockingWriter = (); - type Lister = AzblobLister; + type Lister = oio::PageLister<AzblobLister>; type BlockingLister = (); fn info(&self) -> AccessorInfo { @@ -689,14 +689,14 @@ impl Accessor for AzblobBackend { } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { - let op = AzblobLister::new( + let l = AzblobLister::new( self.core.clone(), path.to_string(), args.recursive(), args.limit(), ); - Ok((RpList::default(), op)) + Ok((RpList::default(), oio::PageLister::new(l))) } async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> { diff --git a/core/src/services/azblob/lister.rs b/core/src/services/azblob/lister.rs index 25ccc1248..96e15821e 100644 --- a/core/src/services/azblob/lister.rs +++ b/core/src/services/azblob/lister.rs @@ -33,9 +33,6 @@ pub struct AzblobLister { path: String, delimiter: &'static str, limit: Option<usize>, - - next_marker: String, - done: bool, } impl AzblobLister { @@ -47,23 +44,16 @@ impl AzblobLister { path, delimiter, limit, - - next_marker: "".to_string(), - done: false, } } } #[async_trait] -impl oio::List for AzblobLister { - async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { - if self.done { - return Ok(None); - } - +impl oio::PageList for AzblobLister { + async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let resp = self .core - .azblob_list_blobs(&self.path, &self.next_marker, self.delimiter, self.limit) + .azblob_list_blobs(&self.path, &ctx.token, self.delimiter, self.limit) .await?; if resp.status() != http::StatusCode::OK { @@ -80,12 +70,11 @@ impl oio::List for AzblobLister { // // - Check `next_marker` if let Some(next_marker) = output.next_marker.as_ref() { - self.done = next_marker.is_empty(); + ctx.done = next_marker.is_empty(); }; - self.next_marker = output.next_marker.clone().unwrap_or_default(); + ctx.token = output.next_marker.clone().unwrap_or_default(); let prefixes = output.blobs.blob_prefix; - let mut entries = Vec::with_capacity(prefixes.len() + output.blobs.blob.len()); for prefix in prefixes { let de = oio::Entry::new( @@ -93,7 +82,7 @@ impl oio::List for AzblobLister { Metadata::new(EntryMode::DIR), ); - entries.push(de) + ctx.entries.push_back(de) } for object in output.blobs.blob { @@ -116,10 +105,10 @@ impl oio::List for AzblobLister { let de = oio::Entry::new(&build_rel_path(&self.core.root, &object.name), meta); - entries.push(de); + ctx.entries.push_back(de); } - Ok(Some(entries)) + Ok(()) } } diff --git a/core/src/services/azdls/backend.rs b/core/src/services/azdls/backend.rs index f5e08abe4..b1a7366e0 100644 --- a/core/src/services/azdls/backend.rs +++ b/core/src/services/azdls/backend.rs @@ -233,7 +233,7 @@ impl Accessor for AzdlsBackend { type BlockingReader = (); type Writer = AzdlsWriters; type BlockingWriter = (); - type Lister = AzdlsLister; + type Lister = oio::PageLister<AzdlsLister>; type BlockingLister = (); fn info(&self) -> AccessorInfo { @@ -366,9 +366,9 @@ impl Accessor for AzdlsBackend { } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { - let op = AzdlsLister::new(self.core.clone(), path.to_string(), args.limit()); + let l = AzdlsLister::new(self.core.clone(), path.to_string(), args.limit()); - Ok((RpList::default(), op)) + Ok((RpList::default(), oio::PageLister::new(l))) } } diff --git a/core/src/services/azdls/lister.rs b/core/src/services/azdls/lister.rs index a624aa31e..db7904f94 100644 --- a/core/src/services/azdls/lister.rs +++ b/core/src/services/azdls/lister.rs @@ -31,41 +31,29 @@ pub struct AzdlsLister { path: String, limit: Option<usize>, - - continuation: String, - done: bool, } impl AzdlsLister { pub fn new(core: Arc<AzdlsCore>, path: String, limit: Option<usize>) -> Self { - Self { - core, - path, - limit, - - continuation: "".to_string(), - done: false, - } + Self { core, path, limit } } } #[async_trait] -impl oio::List for AzdlsLister { - async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { - if self.done { - return Ok(None); - } - +impl oio::PageList for AzdlsLister { + async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let resp = self .core - .azdls_list(&self.path, &self.continuation, self.limit) + .azdls_list(&self.path, &ctx.token, self.limit) .await?; - // Azdls will return not found for not-exist path. + // azdls will return not found for not-exist path. if resp.status() == http::StatusCode::NOT_FOUND { resp.into_body().consume().await?; - return Ok(None); + ctx.done = true; + return Ok(()); } + if resp.status() != http::StatusCode::OK { return Err(parse_error(resp).await?); } @@ -76,19 +64,15 @@ impl oio::List for AzdlsLister { Error::new(ErrorKind::Unexpected, "header value is not valid string") .set_source(err) })?; - self.continuation = value.to_string(); + ctx.token = value.to_string(); } else { - self.continuation = "".to_string(); - self.done = true; + ctx.token = "".to_string(); + ctx.done = true; } let bs = resp.into_body().bytes().await?; - let output: Output = de::from_slice(&bs).map_err(|e| { - Error::new(ErrorKind::Unexpected, "deserialize json from response").set_source(e) - })?; - - let mut entries = Vec::with_capacity(output.paths.len()); + let output: Output = de::from_slice(&bs).map_err(new_json_deserialize_error)?; for object in output.paths { // Azdls will return `"true"` and `"false"` for is_directory. @@ -114,10 +98,10 @@ impl oio::List for AzdlsLister { let de = oio::Entry::new(&path, meta); - entries.push(de); + ctx.entries.push_back(de); } - Ok(Some(entries)) + Ok(()) } } diff --git a/core/src/services/azfile/backend.rs b/core/src/services/azfile/backend.rs index d11b34efd..3b87cb1f0 100644 --- a/core/src/services/azfile/backend.rs +++ b/core/src/services/azfile/backend.rs @@ -251,7 +251,7 @@ impl Accessor for AzfileBackend { type BlockingReader = (); type Writer = AzfileWriters; type BlockingWriter = (); - type Lister = AzfileLister; + type Lister = oio::PageLister<AzfileLister>; type BlockingLister = (); fn info(&self) -> AccessorInfo { @@ -396,9 +396,9 @@ impl Accessor for AzfileBackend { } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { - let op = AzfileLister::new(self.core.clone(), path.to_string(), args.limit()); + let l = AzfileLister::new(self.core.clone(), path.to_string(), args.limit()); - Ok((RpList::default(), op)) + Ok((RpList::default(), oio::PageLister::new(l))) } } diff --git a/core/src/services/azfile/lister.rs b/core/src/services/azfile/lister.rs index cd17aef33..cdd8fc980 100644 --- a/core/src/services/azfile/lister.rs +++ b/core/src/services/azfile/lister.rs @@ -32,39 +32,28 @@ pub struct AzfileLister { core: Arc<AzfileCore>, path: String, limit: Option<usize>, - done: bool, - continuation: String, } impl AzfileLister { pub fn new(core: Arc<AzfileCore>, path: String, limit: Option<usize>) -> Self { - Self { - core, - path, - limit, - done: false, - continuation: "".to_string(), - } + Self { core, path, limit } } } #[async_trait] -impl oio::List for AzfileLister { - async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { - if self.done { - return Ok(None); - } - +impl oio::PageList for AzfileLister { + async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let resp = self .core - .azfile_list(&self.path, &self.limit, &self.continuation) + .azfile_list(&self.path, &self.limit, &ctx.token) .await?; let status = resp.status(); if status != StatusCode::OK { if status == StatusCode::NOT_FOUND { - return Ok(None); + ctx.done = true; + return Ok(()); } return Err(parse_error(resp).await?); } @@ -73,11 +62,13 @@ impl oio::List for AzfileLister { let text = String::from_utf8(bs.to_vec()).expect("response convert to string must success"); - let results: EnumerationResults = from_str(&text).map_err(|e| { - Error::new(ErrorKind::Unexpected, "deserialize xml from response").set_source(e) - })?; + let results: EnumerationResults = from_str(&text).map_err(new_xml_deserialize_error)?; - let mut entries = Vec::new(); + if results.next_marker.is_empty() { + ctx.done = true; + } else { + ctx.token = results.next_marker; + } for file in results.entries.file { let meta = Metadata::new(EntryMode::FILE) @@ -85,7 +76,7 @@ impl oio::List for AzfileLister { .with_content_length(file.properties.content_length.unwrap_or(0)) .with_last_modified(parse_datetime_from_rfc2822(&file.properties.last_modified)?); let path = self.path.clone().trim_start_matches('/').to_string() + &file.name; - entries.push(oio::Entry::new(&path, meta)); + ctx.entries.push_back(oio::Entry::new(&path, meta)); } for dir in results.entries.directory { @@ -93,20 +84,10 @@ impl oio::List for AzfileLister { .with_etag(dir.properties.etag) .with_last_modified(parse_datetime_from_rfc2822(&dir.properties.last_modified)?); let path = self.path.clone().trim_start_matches('/').to_string() + &dir.name + "/"; - entries.push(oio::Entry::new(&path, meta)); + ctx.entries.push_back(oio::Entry::new(&path, meta)); } - if results.next_marker.is_empty() { - self.done = true; - } else { - self.continuation = results.next_marker; - } - - if entries.is_empty() { - Ok(None) - } else { - Ok(Some(entries)) - } + Ok(()) } } diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs index dbf2f9b1f..cf6616c34 100644 --- a/core/src/services/cos/backend.rs +++ b/core/src/services/cos/backend.rs @@ -245,7 +245,7 @@ impl Accessor for CosBackend { type BlockingReader = (); type Writer = CosWriters; type BlockingWriter = (); - type Lister = CosLister; + type Lister = oio::PageLister<CosLister>; type BlockingLister = (); fn info(&self) -> AccessorInfo { @@ -422,9 +422,7 @@ impl Accessor for CosBackend { } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { - Ok(( - RpList::default(), - CosLister::new(self.core.clone(), path, args.recursive(), args.limit()), - )) + let l = CosLister::new(self.core.clone(), path, args.recursive(), args.limit()); + Ok((RpList::default(), oio::PageLister::new(l))) } } diff --git a/core/src/services/cos/lister.rs b/core/src/services/cos/lister.rs index 55d4bd812..a6a402a0d 100644 --- a/core/src/services/cos/lister.rs +++ b/core/src/services/cos/lister.rs @@ -36,9 +36,6 @@ pub struct CosLister { path: String, delimiter: &'static str, limit: Option<usize>, - - next_marker: String, - done: bool, } impl CosLister { @@ -49,23 +46,16 @@ impl CosLister { path: path.to_string(), delimiter, limit, - - next_marker: "".to_string(), - done: false, } } } #[async_trait] -impl oio::List for CosLister { - async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { - if self.done { - return Ok(None); - } - +impl oio::PageList for CosLister { + async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let resp = self .core - .cos_list_objects(&self.path, &self.next_marker, self.delimiter, self.limit) + .cos_list_objects(&self.path, &ctx.token, self.delimiter, self.limit) .await?; if resp.status() != http::StatusCode::OK { @@ -80,22 +70,19 @@ impl oio::List for CosLister { // Try our best to check whether this list is done. // // - Check `next_marker` - self.done = match output.next_marker.as_ref() { + ctx.done = match output.next_marker.as_ref() { None => true, Some(next_marker) => next_marker.is_empty(), }; - self.next_marker = output.next_marker.clone().unwrap_or_default(); - - let common_prefixes = output.common_prefixes; - let mut entries = Vec::with_capacity(common_prefixes.len() + output.contents.len()); + ctx.token = output.next_marker.clone().unwrap_or_default(); - for prefix in common_prefixes { + for prefix in output.common_prefixes { let de = oio::Entry::new( &build_rel_path(&self.core.root, &prefix.prefix), Metadata::new(EntryMode::DIR), ); - entries.push(de); + ctx.entries.push_back(de); } for object in output.contents { @@ -107,10 +94,10 @@ impl oio::List for CosLister { let de = oio::Entry::new(&build_rel_path(&self.core.root, &object.key), meta); - entries.push(de); + ctx.entries.push_back(de); } - Ok(Some(entries)) + Ok(()) } } diff --git a/core/src/services/dbfs/backend.rs b/core/src/services/dbfs/backend.rs index 4b13e10ba..8da90e2b7 100644 --- a/core/src/services/dbfs/backend.rs +++ b/core/src/services/dbfs/backend.rs @@ -158,7 +158,7 @@ impl Accessor for DbfsBackend { type BlockingReader = (); type Writer = oio::OneShotWriter<DbfsWriter>; type BlockingWriter = (); - type Lister = DbfsLister; + type Lister = oio::PageLister<DbfsLister>; type BlockingLister = (); fn info(&self) -> AccessorInfo { @@ -276,9 +276,9 @@ impl Accessor for DbfsBackend { } async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> { - let op = DbfsLister::new(self.core.clone(), path.to_string()); + let l = DbfsLister::new(self.core.clone(), path.to_string()); - Ok((RpList::default(), op)) + Ok((RpList::default(), oio::PageLister::new(l))) } } diff --git a/core/src/services/dbfs/lister.rs b/core/src/services/dbfs/lister.rs index fd27b912b..093b4c478 100644 --- a/core/src/services/dbfs/lister.rs +++ b/core/src/services/dbfs/lister.rs @@ -30,32 +30,24 @@ use super::error::parse_error; pub struct DbfsLister { core: Arc<DbfsCore>, path: String, - done: bool, } impl DbfsLister { pub fn new(core: Arc<DbfsCore>, path: String) -> Self { - Self { - core, - path, - done: false, - } + Self { core, path } } } #[async_trait] -impl oio::List for DbfsLister { - async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { - if self.done { - return Ok(None); - } - +impl oio::PageList for DbfsLister { + async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let response = self.core.dbfs_list(&self.path).await?; let status_code = response.status(); if !status_code.is_success() { if status_code == StatusCode::NOT_FOUND { - return Ok(None); + ctx.done = true; + return Ok(()); } let error = parse_error(response).await?; return Err(error); @@ -65,11 +57,9 @@ impl oio::List for DbfsLister { let mut decoded_response = serde_json::from_slice::<DbfsOutputList>(&bytes).map_err(new_json_deserialize_error)?; - self.done = true; - - let mut entries = Vec::with_capacity(decoded_response.files.len()); + ctx.done = true; - while let Some(status) = decoded_response.files.pop() { + for status in decoded_response.files { let entry: oio::Entry = match status.is_dir { true => { let normalized_path = format!("{}/", &status.path); @@ -88,9 +78,9 @@ impl oio::List for DbfsLister { oio::Entry::new(&status.path, meta) } }; - entries.push(entry); + ctx.entries.push_back(entry); } - Ok(Some(entries)) + Ok(()) } } diff --git a/core/src/services/fs/lister.rs b/core/src/services/fs/lister.rs index 403f674e8..13108b691 100644 --- a/core/src/services/fs/lister.rs +++ b/core/src/services/fs/lister.rs @@ -15,10 +15,14 @@ // specific language governing permissions and limitations // under the License. +use std::fs::FileType; use std::path::Path; use std::path::PathBuf; +use std::task::{ready, Context, Poll}; use async_trait::async_trait; +use futures::future::BoxFuture; +use futures::FutureExt; use crate::raw::*; use crate::EntryMode; @@ -30,6 +34,8 @@ pub struct FsLister<P> { size: usize, rd: P, + + fut: Option<BoxFuture<'static, (tokio::fs::DirEntry, Result<FileType>)>>, } impl<P> FsLister<P> { @@ -38,19 +44,35 @@ impl<P> FsLister<P> { root: root.to_owned(), size: limit.unwrap_or(1000), rd, + + fut: None, } } } +/// # Safety +/// +/// We will only take `&mut Self` reference for FsLister. +unsafe impl<P> Sync for FsLister<P> {} + #[async_trait] impl oio::List for FsLister<tokio::fs::ReadDir> { - async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { - let mut oes: Vec<oio::Entry> = Vec::with_capacity(self.size); - - for _ in 0..self.size { - let de = match self.rd.next_entry().await.map_err(new_std_io_error)? { - Some(de) => de, - None => break, + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { + if let Some(fut) = self.fut.as_mut() { + let (de, ft) = futures::ready!(fut.poll_unpin(cx)); + let ft = match ft { + Ok(ft) => { + self.fut = None; + ft + } + Err(e) => { + let fut = async move { + let ft = de.file_type().await.map_err(new_std_io_error); + (de, ft) + }; + self.fut = Some(Box::pin(fut)); + return Poll::Ready(Err(e)); + } }; let entry_path = de.path(); @@ -62,25 +84,30 @@ impl oio::List for FsLister<tokio::fs::ReadDir> { .replace('\\', "/"), ); - // On Windows and most Unix platforms this function is free - // (no extra system calls needed), but some Unix platforms may - // require the equivalent call to symlink_metadata to learn about - // the target file type. - let file_type = de.file_type().await.map_err(new_std_io_error)?; - - let d = if file_type.is_file() { + let d = if ft.is_file() { oio::Entry::new(&rel_path, Metadata::new(EntryMode::FILE)) - } else if file_type.is_dir() { + } else if ft.is_dir() { // Make sure we are returning the correct path. oio::Entry::new(&format!("{rel_path}/"), Metadata::new(EntryMode::DIR)) } else { oio::Entry::new(&rel_path, Metadata::new(EntryMode::Unknown)) }; - oes.push(d) + return Poll::Ready(Ok(Some(d))); } - Ok(if oes.is_empty() { None } else { Some(oes) }) + let de = ready!(self.rd.poll_next_entry(cx)).map_err(new_std_io_error)?; + match de { + Some(de) => { + let fut = async move { + let ft = de.file_type().await.map_err(new_std_io_error); + (de, ft) + }; + self.fut = Some(Box::pin(fut)); + self.poll_next(cx) + } + None => Poll::Ready(Ok(None)), + } } } diff --git a/core/src/services/ftp/lister.rs b/core/src/services/ftp/lister.rs index ba0892e8c..354bd4379 100644 --- a/core/src/services/ftp/lister.rs +++ b/core/src/services/ftp/lister.rs @@ -17,6 +17,7 @@ use std::str; use std::str::FromStr; +use std::task::{Context, Poll}; use std::vec::IntoIter; use async_trait::async_trait; @@ -43,35 +44,29 @@ impl FtpLister { #[async_trait] impl oio::List for FtpLister { - async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { - let mut oes: Vec<oio::Entry> = Vec::with_capacity(self.size); + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { + let de = match self.file_iter.next() { + Some(file_str) => File::from_str(file_str.as_str()).map_err(|e| { + Error::new(ErrorKind::Unexpected, "parse file from response").set_source(e) + })?, + None => return Poll::Ready(Ok(None)), + }; - for _ in 0..self.size { - let de = match self.file_iter.next() { - Some(file_str) => File::from_str(file_str.as_str()).map_err(|e| { - Error::new(ErrorKind::Unexpected, "parse file from response").set_source(e) - })?, - None => break, - }; + let path = self.path.to_string() + de.name(); - let path = self.path.to_string() + de.name(); + let entry = if de.is_file() { + oio::Entry::new( + &path, + Metadata::new(EntryMode::FILE) + .with_content_length(de.size() as u64) + .with_last_modified(de.modified().into()), + ) + } else if de.is_directory() { + oio::Entry::new(&format!("{}/", &path), Metadata::new(EntryMode::DIR)) + } else { + oio::Entry::new(&path, Metadata::new(EntryMode::Unknown)) + }; - let d = if de.is_file() { - oio::Entry::new( - &path, - Metadata::new(EntryMode::FILE) - .with_content_length(de.size() as u64) - .with_last_modified(de.modified().into()), - ) - } else if de.is_directory() { - oio::Entry::new(&format!("{}/", &path), Metadata::new(EntryMode::DIR)) - } else { - oio::Entry::new(&path, Metadata::new(EntryMode::Unknown)) - }; - - oes.push(d) - } - - Ok(if oes.is_empty() { None } else { Some(oes) }) + Poll::Ready(Ok(Some(entry))) } } diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs index c2340b7c5..d528479dc 100644 --- a/core/src/services/gcs/backend.rs +++ b/core/src/services/gcs/backend.rs @@ -312,7 +312,7 @@ impl Accessor for GcsBackend { type BlockingReader = (); type Writer = GcsWriters; type BlockingWriter = (); - type Lister = GcsLister; + type Lister = oio::PageLister<GcsLister>; type BlockingLister = (); fn info(&self) -> AccessorInfo { @@ -472,16 +472,15 @@ impl Accessor for GcsBackend { } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { - Ok(( - RpList::default(), - GcsLister::new( - self.core.clone(), - path, - args.recursive(), - args.limit(), - args.start_after(), - ), - )) + let l = GcsLister::new( + self.core.clone(), + path, + args.recursive(), + args.limit(), + args.start_after(), + ); + + Ok((RpList::default(), oio::PageLister::new(l))) } async fn batch(&self, args: OpBatch) -> Result<RpBatch> { diff --git a/core/src/services/gcs/lister.rs b/core/src/services/gcs/lister.rs index 97496f353..244ca2a93 100644 --- a/core/src/services/gcs/lister.rs +++ b/core/src/services/gcs/lister.rs @@ -38,9 +38,6 @@ pub struct GcsLister { /// Filter results to objects whose names are lexicographically /// **equal to or after** startOffset start_after: Option<String>, - - page_token: String, - done: bool, } impl GcsLister { @@ -60,28 +57,25 @@ impl GcsLister { delimiter, limit, start_after: start_after.map(String::from), - - page_token: "".to_string(), - done: false, } } } #[async_trait] -impl oio::List for GcsLister { - async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { - if self.done { - return Ok(None); - } - +impl oio::PageList for GcsLister { + async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let resp = self .core .gcs_list_objects( &self.path, - &self.page_token, + &ctx.token, self.delimiter, self.limit, - self.start_after.clone(), + if ctx.token.is_empty() { + self.start_after.clone() + } else { + None + }, ) .await?; @@ -94,20 +88,18 @@ impl oio::List for GcsLister { serde_json::from_slice(&bytes).map_err(new_json_deserialize_error)?; if let Some(token) = &output.next_page_token { - self.page_token = token.clone(); + ctx.token = token.clone(); } else { - self.done = true; + ctx.done = true; } - let mut entries = Vec::with_capacity(output.prefixes.len() + output.items.len()); - for prefix in output.prefixes { let de = oio::Entry::new( &build_rel_path(&self.core.root, &prefix), Metadata::new(EntryMode::DIR), ); - entries.push(de); + ctx.entries.push_back(de); } for object in output.items { @@ -139,10 +131,10 @@ impl oio::List for GcsLister { let de = oio::Entry::new(path, meta); - entries.push(de); + ctx.entries.push_back(de); } - Ok(Some(entries)) + Ok(()) } } diff --git a/core/src/services/gdrive/backend.rs b/core/src/services/gdrive/backend.rs index f0d9da1b1..8f82ede86 100644 --- a/core/src/services/gdrive/backend.rs +++ b/core/src/services/gdrive/backend.rs @@ -45,7 +45,7 @@ impl Accessor for GdriveBackend { type BlockingReader = (); type Writer = oio::OneShotWriter<GdriveWriter>; type BlockingWriter = (); - type Lister = GdriveLister; + type Lister = oio::PageLister<GdriveLister>; type BlockingLister = (); fn info(&self) -> AccessorInfo { @@ -213,10 +213,8 @@ impl Accessor for GdriveBackend { } async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> { - Ok(( - RpList::default(), - GdriveLister::new(path.into(), self.core.clone()), - )) + let l = GdriveLister::new(path.into(), self.core.clone()); + Ok((RpList::default(), oio::PageLister::new(l))) } async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> { diff --git a/core/src/services/gdrive/core.rs b/core/src/services/gdrive/core.rs index 3239b281b..f6517419d 100644 --- a/core/src/services/gdrive/core.rs +++ b/core/src/services/gdrive/core.rs @@ -310,7 +310,7 @@ impl GdriveCore { &self, path: &str, page_size: i32, - next_page_token: Option<String>, + next_page_token: &str, ) -> Result<Response<IncomingAsyncBody>> { let file_id = self.get_file_id_by_path(path).await; @@ -341,22 +341,13 @@ impl GdriveCore { }, }; - let url = match next_page_token { - Some(page_token) => { - format!( - "https://www.googleapis.com/drive/v3/files?pageSize={}&pageToken={}&q={}", - page_size, - page_token, - percent_encode_path(q.as_str()) - ) - } - None => { - format!( - "https://www.googleapis.com/drive/v3/files?pageSize={}&q={}", - page_size, - percent_encode_path(q.as_str()) - ) - } + let mut url = format!( + "https://www.googleapis.com/drive/v3/files?pageSize={}&q={}", + page_size, + percent_encode_path(&q) + ); + if !next_page_token.is_empty() { + url += &format!("&pageToken={next_page_token}"); }; let mut req = Request::get(&url) diff --git a/core/src/services/gdrive/lister.rs b/core/src/services/gdrive/lister.rs index 53d8f86c4..053a0fabb 100644 --- a/core/src/services/gdrive/lister.rs +++ b/core/src/services/gdrive/lister.rs @@ -23,80 +23,56 @@ use http::StatusCode; use super::core::GdriveCore; use super::core::GdriveFileList; use super::error::parse_error; -use crate::raw::build_rel_path; -use crate::raw::build_rooted_abs_path; -use crate::raw::new_json_deserialize_error; -use crate::raw::oio::{self}; -use crate::EntryMode; -use crate::Metadata; -use crate::Result; +use crate::raw::*; +use crate::*; + pub struct GdriveLister { path: String, core: Arc<GdriveCore>, - next_page_token: Option<String>, - done: bool, } impl GdriveLister { pub fn new(path: String, core: Arc<GdriveCore>) -> Self { - Self { - path, - core, - next_page_token: None, - done: false, - } + Self { path, core } } } #[async_trait] -impl oio::List for GdriveLister { - async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { - if self.done { - return Ok(None); - } - - let resp = self - .core - .gdrive_list(&self.path, 100, self.next_page_token.clone()) - .await?; +impl oio::PageList for GdriveLister { + async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { + let resp = self.core.gdrive_list(&self.path, 100, &ctx.token).await?; let bytes = match resp.status() { StatusCode::OK => resp.into_body().bytes().await?, _ => return Err(parse_error(resp).await?), }; - if bytes.is_empty() { - return Ok(None); - } - let decoded_response = serde_json::from_slice::<GdriveFileList>(&bytes).map_err(new_json_deserialize_error)?; if let Some(next_page_token) = decoded_response.next_page_token { - self.next_page_token = Some(next_page_token); + ctx.token = next_page_token; } else { - self.done = true; + ctx.done = true; } - let entries: Vec<oio::Entry> = decoded_response - .files - .into_iter() - .map(|mut file| { - let file_type = if file.mime_type.as_str() == "application/vnd.google-apps.folder" { - file.name = format!("{}/", file.name); - EntryMode::DIR - } else { - EntryMode::FILE - }; + for mut file in decoded_response.files { + let file_type = if file.mime_type.as_str() == "application/vnd.google-apps.folder" { + file.name = format!("{}/", file.name); + EntryMode::DIR + } else { + EntryMode::FILE + }; - let root = &self.core.root; - let path = format!("{}{}", build_rooted_abs_path(root, &self.path), file.name); - let normalized_path = build_rel_path(root, &path); + let root = &self.core.root; + let path = format!("{}{}", build_rooted_abs_path(root, &self.path), file.name); + let normalized_path = build_rel_path(root, &path); - oio::Entry::new(&normalized_path, Metadata::new(file_type)) - }) - .collect(); + let entry = oio::Entry::new(&normalized_path, Metadata::new(file_type)); + + ctx.entries.push_back(entry); + } - Ok(Some(entries)) + Ok(()) } } diff --git a/core/src/services/hdfs/lister.rs b/core/src/services/hdfs/lister.rs index 5e31c1e14..ea8a57572 100644 --- a/core/src/services/hdfs/lister.rs +++ b/core/src/services/hdfs/lister.rs @@ -16,6 +16,7 @@ // under the License. use async_trait::async_trait; +use std::task::{Context, Poll}; use crate::raw::*; use crate::EntryMode; @@ -42,33 +43,27 @@ impl HdfsLister { #[async_trait] impl oio::List for HdfsLister { - async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { - let mut oes: Vec<oio::Entry> = Vec::with_capacity(self.size); - - for _ in 0..self.size { - let de = match self.rd.next() { - Some(de) => de, - None => break, - }; - - let path = build_rel_path(&self.root, de.path()); - - let d = if de.is_file() { - let meta = Metadata::new(EntryMode::FILE) - .with_content_length(de.len()) - .with_last_modified(de.modified().into()); - oio::Entry::new(&path, meta) - } else if de.is_dir() { - // Make sure we are returning the correct path. - oio::Entry::new(&format!("{path}/"), Metadata::new(EntryMode::DIR)) - } else { - oio::Entry::new(&path, Metadata::new(EntryMode::Unknown)) - }; - - oes.push(d) - } - - Ok(if oes.is_empty() { None } else { Some(oes) }) + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { + let de = match self.rd.next() { + Some(de) => de, + None => return Poll::Ready(Ok(None)), + }; + + let path = build_rel_path(&self.root, de.path()); + + let entry = if de.is_file() { + let meta = Metadata::new(EntryMode::FILE) + .with_content_length(de.len()) + .with_last_modified(de.modified().into()); + oio::Entry::new(&path, meta) + } else if de.is_dir() { + // Make sure we are returning the correct path. + oio::Entry::new(&format!("{path}/"), Metadata::new(EntryMode::DIR)) + } else { + oio::Entry::new(&path, Metadata::new(EntryMode::Unknown)) + }; + + Poll::Ready(Ok(Some(entry))) } } diff --git a/core/src/services/ipfs/backend.rs b/core/src/services/ipfs/backend.rs index 2c1681dbe..915e2b798 100644 --- a/core/src/services/ipfs/backend.rs +++ b/core/src/services/ipfs/backend.rs @@ -165,7 +165,7 @@ impl Accessor for IpfsBackend { type BlockingReader = (); type Writer = (); type BlockingWriter = (); - type Lister = DirStream; + type Lister = oio::PageLister<DirStream>; type BlockingLister = (); fn info(&self) -> AccessorInfo { @@ -351,10 +351,8 @@ impl Accessor for IpfsBackend { } async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> { - Ok(( - RpList::default(), - DirStream::new(Arc::new(self.clone()), path), - )) + let l = DirStream::new(Arc::new(self.clone()), path); + Ok((RpList::default(), oio::PageLister::new(l))) } } @@ -415,7 +413,6 @@ impl IpfsBackend { pub struct DirStream { backend: Arc<IpfsBackend>, path: String, - consumed: bool, } impl DirStream { @@ -423,18 +420,13 @@ impl DirStream { Self { backend, path: path.to_string(), - consumed: false, } } } #[async_trait] -impl oio::List for DirStream { - async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { - if self.consumed { - return Ok(None); - } - +impl oio::PageList for DirStream { + async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let resp = self.backend.ipfs_list(&self.path).await?; if resp.status() != StatusCode::OK { @@ -452,8 +444,6 @@ impl oio::List for DirStream { .map(|v| v.name.unwrap()) .collect::<Vec<String>>(); - let mut oes = Vec::with_capacity(names.len()); - for mut name in names { let meta = self .backend @@ -465,10 +455,10 @@ impl oio::List for DirStream { name += "/"; } - oes.push(oio::Entry::new(&name, meta)) + ctx.entries.push_back(oio::Entry::new(&name, meta)) } - self.consumed = true; - Ok(Some(oes)) + ctx.done = true; + Ok(()) } } diff --git a/core/src/services/ipmfs/backend.rs b/core/src/services/ipmfs/backend.rs index 26c06d569..7a9da9b4e 100644 --- a/core/src/services/ipmfs/backend.rs +++ b/core/src/services/ipmfs/backend.rs @@ -66,7 +66,7 @@ impl Accessor for IpmfsBackend { type BlockingReader = (); type Writer = oio::OneShotWriter<IpmfsWriter>; type BlockingWriter = (); - type Lister = IpmfsLister; + type Lister = oio::PageLister<IpmfsLister>; type BlockingLister = (); fn info(&self) -> AccessorInfo { @@ -171,10 +171,8 @@ impl Accessor for IpmfsBackend { } async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> { - Ok(( - RpList::default(), - IpmfsLister::new(Arc::new(self.clone()), &self.root, path), - )) + let l = IpmfsLister::new(Arc::new(self.clone()), &self.root, path); + Ok((RpList::default(), oio::PageLister::new(l))) } } diff --git a/core/src/services/ipmfs/lister.rs b/core/src/services/ipmfs/lister.rs index 535c982b1..902d32592 100644 --- a/core/src/services/ipmfs/lister.rs +++ b/core/src/services/ipmfs/lister.rs @@ -47,12 +47,8 @@ impl IpmfsLister { } #[async_trait] -impl oio::List for IpmfsLister { - async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { - if self.consumed { - return Ok(None); - } - +impl oio::PageList for IpmfsLister { + async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let resp = self.backend.ipmfs_ls(&self.path).await?; if resp.status() != StatusCode::OK { @@ -64,33 +60,24 @@ impl oio::List for IpmfsLister { serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?; // Mark dir stream has been consumed. - self.consumed = true; - - Ok(Some( - entries_body - .entries - .unwrap_or_default() - .into_iter() - .map(|object| { - let path = match object.mode() { - EntryMode::FILE => { - format!("{}{}", &self.path, object.name) - } - EntryMode::DIR => { - format!("{}{}/", &self.path, object.name) - } - EntryMode::Unknown => unreachable!(), - }; - - let path = build_rel_path(&self.root, &path); - - oio::Entry::new( - &path, - Metadata::new(object.mode()).with_content_length(object.size), - ) - }) - .collect(), - )) + ctx.done = true; + + for object in entries_body.entries.unwrap_or_default() { + let path = match object.mode() { + EntryMode::FILE => format!("{}{}", &self.path, object.name), + EntryMode::DIR => format!("{}{}/", &self.path, object.name), + EntryMode::Unknown => unreachable!(), + }; + + let path = build_rel_path(&self.root, &path); + + ctx.entries.push_back(oio::Entry::new( + &path, + Metadata::new(object.mode()).with_content_length(object.size), + )); + } + + Ok(()) } } diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs index 6cfe43e4f..4f84852b5 100644 --- a/core/src/services/obs/backend.rs +++ b/core/src/services/obs/backend.rs @@ -252,7 +252,7 @@ impl Accessor for ObsBackend { type BlockingReader = (); type Writer = ObsWriters; type BlockingWriter = (); - type Lister = ObsLister; + type Lister = oio::PageLister<ObsLister>; type BlockingLister = (); fn info(&self) -> AccessorInfo { @@ -428,9 +428,7 @@ impl Accessor for ObsBackend { } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { - Ok(( - RpList::default(), - ObsLister::new(self.core.clone(), path, args.recursive(), args.limit()), - )) + let l = ObsLister::new(self.core.clone(), path, args.recursive(), args.limit()); + Ok((RpList::default(), oio::PageLister::new(l))) } } diff --git a/core/src/services/obs/lister.rs b/core/src/services/obs/lister.rs index c96f51e3e..831427ab1 100644 --- a/core/src/services/obs/lister.rs +++ b/core/src/services/obs/lister.rs @@ -36,9 +36,6 @@ pub struct ObsLister { path: String, delimiter: &'static str, limit: Option<usize>, - - next_marker: String, - done: bool, } impl ObsLister { @@ -50,23 +47,16 @@ impl ObsLister { path: path.to_string(), delimiter, limit, - - next_marker: "".to_string(), - done: false, } } } #[async_trait] -impl oio::List for ObsLister { - async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { - if self.done { - return Ok(None); - } - +impl oio::PageList for ObsLister { + async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let resp = self .core - .obs_list_objects(&self.path, &self.next_marker, self.delimiter, self.limit) + .obs_list_objects(&self.path, &ctx.token, self.delimiter, self.limit) .await?; if resp.status() != http::StatusCode::OK { @@ -81,14 +71,13 @@ impl oio::List for ObsLister { // Try our best to check whether this list is done. // // - Check `next_marker` - self.done = match output.next_marker.as_ref() { + ctx.done = match output.next_marker.as_ref() { None => true, Some(next_marker) => next_marker.is_empty(), }; - self.next_marker = output.next_marker.clone().unwrap_or_default(); + ctx.token = output.next_marker.clone().unwrap_or_default(); let common_prefixes = output.common_prefixes; - let mut entries = Vec::with_capacity(common_prefixes.len() + output.contents.len()); for prefix in common_prefixes { let de = oio::Entry::new( @@ -96,7 +85,7 @@ impl oio::List for ObsLister { Metadata::new(EntryMode::DIR), ); - entries.push(de); + ctx.entries.push_back(de); } for object in output.contents { @@ -108,10 +97,10 @@ impl oio::List for ObsLister { let de = oio::Entry::new(&build_rel_path(&self.core.root, &object.key), meta); - entries.push(de); + ctx.entries.push_back(de); } - Ok(Some(entries)) + Ok(()) } } diff --git a/core/src/services/onedrive/backend.rs b/core/src/services/onedrive/backend.rs index bef175735..ba4e5b8d3 100644 --- a/core/src/services/onedrive/backend.rs +++ b/core/src/services/onedrive/backend.rs @@ -66,7 +66,7 @@ impl Accessor for OnedriveBackend { type BlockingReader = (); type Writer = oio::OneShotWriter<OneDriveWriter>; type BlockingWriter = (); - type Lister = OnedriveLister; + type Lister = oio::PageLister<OnedriveLister>; type BlockingLister = (); fn info(&self) -> AccessorInfo { @@ -163,10 +163,9 @@ impl Accessor for OnedriveBackend { } async fn list(&self, path: &str, _op_list: OpList) -> Result<(RpList, Self::Lister)> { - let lister: OnedriveLister = - OnedriveLister::new(self.root.clone(), path.into(), self.clone()); + let l = OnedriveLister::new(self.root.clone(), path.into(), self.clone()); - Ok((RpList::default(), lister)) + Ok((RpList::default(), oio::PageLister::new(l))) } async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> { diff --git a/core/src/services/onedrive/lister.rs b/core/src/services/onedrive/lister.rs index 90daeb866..12c557867 100644 --- a/core/src/services/onedrive/lister.rs +++ b/core/src/services/onedrive/lister.rs @@ -16,28 +16,19 @@ // under the License. use async_trait::async_trait; -use http::Response; use super::backend::OnedriveBackend; use super::error::parse_error; use super::graph_model::GraphApiOnedriveListResponse; use super::graph_model::ItemType; -use crate::raw::build_rel_path; -use crate::raw::build_rooted_abs_path; -use crate::raw::new_json_deserialize_error; -use crate::raw::oio::{self}; -use crate::raw::percent_encode_path; -use crate::raw::IncomingAsyncBody; -use crate::EntryMode; -use crate::Metadata; -use crate::Result; +use crate::raw::oio; +use crate::raw::*; +use crate::*; pub struct OnedriveLister { root: String, path: String, backend: OnedriveBackend, - next_link: Option<String>, - done: bool, } impl OnedriveLister { @@ -48,92 +39,80 @@ impl OnedriveLister { root, path, backend, - next_link: None, - done: false, } } } #[async_trait] -impl oio::List for OnedriveLister { - async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { - if self.done { - return Ok(None); - } - let response = self.onedrive_get().await?; +impl oio::PageList for OnedriveLister { + async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { + let request_url = if ctx.token.is_empty() { + let path = build_rooted_abs_path(&self.root, &self.path); + let url: String = if path == "." || path == "/" { + "https://graph.microsoft.com/v1.0/me/drive/root/children".to_string() + } else { + // According to OneDrive API examples, the path should not end with a slash. + // Reference: <https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_list_children?view=odsp-graph-online> + let path = path.strip_suffix('/').unwrap_or(""); + format!( + "https://graph.microsoft.com/v1.0/me/drive/root:{}:/children", + percent_encode_path(path), + ) + }; + url + } else { + ctx.token.clone() + }; + + let resp = self + .backend + .onedrive_get_next_list_page(&request_url) + .await?; - let status_code = response.status(); + let status_code = resp.status(); if !status_code.is_success() { if status_code == http::StatusCode::NOT_FOUND { - return Ok(None); + ctx.done = true; + return Ok(()); } - let error = parse_error(response).await?; + let error = parse_error(resp).await?; return Err(error); } - let bytes = response.into_body().bytes().await?; + let bytes = resp.into_body().bytes().await?; let decoded_response = serde_json::from_slice::<GraphApiOnedriveListResponse>(&bytes) .map_err(new_json_deserialize_error)?; if let Some(next_link) = decoded_response.next_link { - self.next_link = Some(next_link); + ctx.token = next_link; } else { - self.done = true; + ctx.done = true; } - let entries: Vec<oio::Entry> = decoded_response - .value - .into_iter() - .map(|drive_item| { - let name = drive_item.name; - let parent_path = drive_item.parent_reference.path; - let parent_path = parent_path - .strip_prefix(Self::DRIVE_ROOT_PREFIX) - .unwrap_or(""); - - let path = format!("{}/{}", parent_path, name); - - let normalized_path = build_rel_path(&self.root, &path); - - let entry: oio::Entry = match drive_item.item_type { - ItemType::Folder { .. } => { - let normalized_path = format!("{}/", normalized_path); - oio::Entry::new(&normalized_path, Metadata::new(EntryMode::DIR)) - } - ItemType::File { .. } => { - oio::Entry::new(&normalized_path, Metadata::new(EntryMode::FILE)) - } - }; - entry - }) - .collect(); - - Ok(Some(entries)) - } -} - -impl OnedriveLister { - async fn onedrive_get(&mut self) -> Result<Response<IncomingAsyncBody>> { - let request_url = if let Some(next_link) = &self.next_link { - let next_link_clone = next_link.clone(); - self.next_link = None; - next_link_clone - } else { - let path = build_rooted_abs_path(&self.root, &self.path); - let url: String = if path == "." || path == "/" { - "https://graph.microsoft.com/v1.0/me/drive/root/children".to_string() - } else { - // According to OneDrive API examples, the path should not end with a slash. - // Reference: <https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_list_children?view=odsp-graph-online> - let path = path.strip_suffix('/').unwrap_or(""); - format!( - "https://graph.microsoft.com/v1.0/me/drive/root:{}:/children", - percent_encode_path(path), - ) + for drive_item in decoded_response.value { + let name = drive_item.name; + let parent_path = drive_item.parent_reference.path; + let parent_path = parent_path + .strip_prefix(Self::DRIVE_ROOT_PREFIX) + .unwrap_or(""); + + let path = format!("{}/{}", parent_path, name); + + let normalized_path = build_rel_path(&self.root, &path); + + let entry: oio::Entry = match drive_item.item_type { + ItemType::Folder { .. } => { + let normalized_path = format!("{}/", normalized_path); + oio::Entry::new(&normalized_path, Metadata::new(EntryMode::DIR)) + } + ItemType::File { .. } => { + oio::Entry::new(&normalized_path, Metadata::new(EntryMode::FILE)) + } }; - url - }; - self.backend.onedrive_get_next_list_page(&request_url).await + ctx.entries.push_back(entry) + } + + Ok(()) } } diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs index c5fc3441c..a9e512429 100644 --- a/core/src/services/oss/backend.rs +++ b/core/src/services/oss/backend.rs @@ -380,7 +380,7 @@ impl Accessor for OssBackend { type BlockingReader = (); type Writer = OssWriters; type BlockingWriter = (); - type Lister = OssLister; + type Lister = oio::PageLister<OssLister>; type BlockingLister = (); fn info(&self) -> AccessorInfo { @@ -545,16 +545,14 @@ impl Accessor for OssBackend { } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { - Ok(( - RpList::default(), - OssLister::new( - self.core.clone(), - path, - args.recursive(), - args.limit(), - args.start_after(), - ), - )) + let l = OssLister::new( + self.core.clone(), + path, + args.recursive(), + args.limit(), + args.start_after(), + ); + Ok((RpList::default(), oio::PageLister::new(l))) } async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> { diff --git a/core/src/services/oss/core.rs b/core/src/services/oss/core.rs index f0feb59d2..c90439de1 100644 --- a/core/src/services/oss/core.rs +++ b/core/src/services/oss/core.rs @@ -324,7 +324,7 @@ impl OssCore { pub fn oss_list_object_request( &self, path: &str, - token: Option<&str>, + token: &str, delimiter: &str, limit: Option<usize>, start_after: Option<String>, @@ -347,13 +347,9 @@ impl OssCore { } // continuation_token - if let Some(continuation_token) = token { - write!( - url, - "&continuation-token={}", - percent_encode_path(continuation_token) - ) - .expect("write into string must succeed"); + if !token.is_empty() { + write!(url, "&continuation-token={}", percent_encode_path(token)) + .expect("write into string must succeed"); } // start-after @@ -446,7 +442,7 @@ impl OssCore { pub async fn oss_list_object( &self, path: &str, - token: Option<&str>, + token: &str, delimiter: &str, limit: Option<usize>, start_after: Option<String>, diff --git a/core/src/services/oss/lister.rs b/core/src/services/oss/lister.rs index 7b4b0bce3..6412cf8bf 100644 --- a/core/src/services/oss/lister.rs +++ b/core/src/services/oss/lister.rs @@ -41,9 +41,6 @@ pub struct OssLister { /// Filter results to objects whose names are lexicographically /// **equal to or after** startOffset start_after: Option<String>, - - token: Option<String>, - done: bool, } impl OssLister { @@ -61,28 +58,25 @@ impl OssLister { delimiter, limit, start_after: start_after.map(String::from), - token: None, - - done: false, } } } #[async_trait] -impl oio::List for OssLister { - async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { - if self.done { - return Ok(None); - } - +impl oio::PageList for OssLister { + async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let resp = self .core .oss_list_object( &self.path, - self.token.as_deref(), + &ctx.token, self.delimiter, self.limit, - self.start_after.clone(), + if ctx.token.is_empty() { + self.start_after.clone() + } else { + None + }, ) .await?; @@ -95,17 +89,15 @@ impl oio::List for OssLister { let output: ListBucketOutput = de::from_reader(bs.reader()) .map_err(|e| Error::new(ErrorKind::Unexpected, "deserialize xml").set_source(e))?; - self.done = !output.is_truncated; - self.token = output.next_continuation_token.clone(); - - let mut entries = Vec::with_capacity(output.common_prefixes.len() + output.contents.len()); + ctx.done = !output.is_truncated; + ctx.token = output.next_continuation_token.unwrap_or_default(); for prefix in output.common_prefixes { let de = oio::Entry::new( &build_rel_path(&self.core.root, &prefix.prefix), Metadata::new(EntryMode::DIR), ); - entries.push(de); + ctx.entries.push_back(de); } for object in output.contents { @@ -129,10 +121,10 @@ impl oio::List for OssLister { let path = unescape(&rel) .map_err(|e| Error::new(ErrorKind::Unexpected, "excapse xml").set_source(e))?; let de = oio::Entry::new(&path, meta); - entries.push(de); + ctx.entries.push_back(de); } - Ok(Some(entries)) + Ok(()) } } diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index 7e1ccfc85..56663f522 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -969,7 +969,7 @@ impl Accessor for S3Backend { type BlockingReader = (); type Writer = S3Writers; type BlockingWriter = (); - type Lister = S3Lister; + type Lister = oio::PageLister<S3Lister>; type BlockingLister = (); fn info(&self) -> AccessorInfo { @@ -1134,16 +1134,14 @@ impl Accessor for S3Backend { } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { - Ok(( - RpList::default(), - S3Lister::new( - self.core.clone(), - path, - args.recursive(), - args.limit(), - args.start_after(), - ), - )) + let l = S3Lister::new( + self.core.clone(), + path, + args.recursive(), + args.limit(), + args.start_after(), + ); + Ok((RpList::default(), oio::PageLister::new(l))) } async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> { diff --git a/core/src/services/s3/lister.rs b/core/src/services/s3/lister.rs index 431abd8f1..78d5e1ff3 100644 --- a/core/src/services/s3/lister.rs +++ b/core/src/services/s3/lister.rs @@ -38,9 +38,6 @@ pub struct S3Lister { /// Amazon S3 starts listing **after** this specified key start_after: Option<String>, - - token: String, - done: bool, } impl S3Lister { @@ -59,28 +56,26 @@ impl S3Lister { delimiter, limit, start_after: start_after.map(String::from), - - token: "".to_string(), - done: false, } } } #[async_trait] -impl oio::List for S3Lister { - async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { - if self.done { - return Ok(None); - } - +impl oio::PageList for S3Lister { + async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let resp = self .core .s3_list_objects( &self.path, - &self.token, + &ctx.token, self.delimiter, self.limit, - self.start_after.clone(), + // State after should only be set for the first page. + if ctx.token.is_empty() { + self.start_after.clone() + } else { + None + }, ) .await?; @@ -97,16 +92,14 @@ impl oio::List for S3Lister { // - Check `is_truncated` // - Check `next_continuation_token` // - Check the length of `common_prefixes` and `contents` (very rarely case) - self.done = if let Some(is_truncated) = output.is_truncated { + ctx.done = if let Some(is_truncated) = output.is_truncated { !is_truncated } else if let Some(next_continuation_token) = output.next_continuation_token.as_ref() { next_continuation_token.is_empty() } else { output.common_prefixes.is_empty() && output.contents.is_empty() }; - self.token = output.next_continuation_token.clone().unwrap_or_default(); - - let mut entries = Vec::with_capacity(output.common_prefixes.len() + output.contents.len()); + ctx.token = output.next_continuation_token.clone().unwrap_or_default(); for prefix in output.common_prefixes { let de = oio::Entry::new( @@ -114,7 +107,7 @@ impl oio::List for S3Lister { Metadata::new(EntryMode::DIR), ); - entries.push(de); + ctx.entries.push_back(de); } for object in output.contents { @@ -139,10 +132,10 @@ impl oio::List for S3Lister { let de = oio::Entry::new(&build_rel_path(&self.core.root, &object.key), meta); - entries.push(de); + ctx.entries.push_back(de); } - Ok(Some(entries)) + Ok(()) } } diff --git a/core/src/services/sftp/lister.rs b/core/src/services/sftp/lister.rs index b130629ca..e69f92213 100644 --- a/core/src/services/sftp/lister.rs +++ b/core/src/services/sftp/lister.rs @@ -16,6 +16,7 @@ // under the License. use std::pin::Pin; +use std::task::{ready, Context, Poll}; use async_trait::async_trait; use futures::StreamExt; @@ -47,24 +48,18 @@ impl SftpLister { #[async_trait] impl oio::List for SftpLister { - async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { - if self.limit == 0 { - return Ok(None); - } - - let item = self.dir.next().await; + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { + let item = ready!(self.dir.poll_next_unpin(cx)).transpose()?; match item { - Some(Ok(e)) => { + Some(e) => { if e.filename().to_str() == Some(".") || e.filename().to_str() == Some("..") { - self.next().await + self.poll_next(cx) } else { - self.limit -= 1; - Ok(Some(vec![map_entry(self.prefix.as_str(), e.clone())])) + Poll::Ready(Ok(Some(map_entry(self.prefix.as_str(), e)))) } } - Some(Err(e)) => Err(e.into()), - None => Ok(None), + None => Poll::Ready(Ok(None)), } } } diff --git a/core/src/services/swift/backend.rs b/core/src/services/swift/backend.rs index 6f6b22dba..9fa58b7dd 100644 --- a/core/src/services/swift/backend.rs +++ b/core/src/services/swift/backend.rs @@ -218,7 +218,7 @@ impl Accessor for SwiftBackend { type BlockingReader = (); type Writer = oio::OneShotWriter<SwiftWriter>; type BlockingWriter = (); - type Lister = SwiftLister; + type Lister = oio::PageLister<SwiftLister>; type BlockingLister = (); fn info(&self) -> AccessorInfo { @@ -329,8 +329,8 @@ impl Accessor for SwiftBackend { } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { - let op = SwiftLister::new(self.core.clone(), path.to_string(), args.recursive()); + let l = SwiftLister::new(self.core.clone(), path.to_string(), args.recursive()); - Ok((RpList::default(), op)) + Ok((RpList::default(), oio::PageLister::new(l))) } } diff --git a/core/src/services/swift/lister.rs b/core/src/services/swift/lister.rs index d69bb42b5..7fdfa0d5b 100644 --- a/core/src/services/swift/lister.rs +++ b/core/src/services/swift/lister.rs @@ -29,7 +29,6 @@ pub struct SwiftLister { core: Arc<SwiftCore>, path: String, delimiter: &'static str, - done: bool, } impl SwiftLister { @@ -39,18 +38,13 @@ impl SwiftLister { core, path, delimiter, - done: false, } } } #[async_trait] -impl oio::List for SwiftLister { - async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { - if self.done { - return Ok(None); - } - +impl oio::PageList for SwiftLister { + async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { let response = self.core.swift_list(&self.path, self.delimiter).await?; let status_code = response.status(); @@ -60,15 +54,13 @@ impl oio::List for SwiftLister { return Err(error); } + ctx.done = true; + let bytes = response.into_body().bytes().await?; let mut decoded_response = serde_json::from_slice::<Vec<ListOpResponse>>(&bytes) .map_err(new_json_deserialize_error)?; - self.done = true; - - let mut entries = Vec::with_capacity(decoded_response.len()); - - while let Some(status) = decoded_response.pop() { + for status in decoded_response { let entry: oio::Entry = match status { ListOpResponse::Subdir { subdir } => { let meta = Metadata::new(EntryMode::DIR); @@ -99,9 +91,9 @@ impl oio::List for SwiftLister { oio::Entry::new(&name, meta) } }; - entries.push(entry); + ctx.entries.push_back(entry); } - Ok(Some(entries)) + Ok(()) } } diff --git a/core/src/services/webdav/backend.rs b/core/src/services/webdav/backend.rs index 09e0f839a..aa676c44c 100644 --- a/core/src/services/webdav/backend.rs +++ b/core/src/services/webdav/backend.rs @@ -223,7 +223,7 @@ impl Accessor for WebdavBackend { type BlockingReader = (); type Writer = oio::OneShotWriter<WebdavWriter>; type BlockingWriter = (); - type Lister = Option<WebdavLister>; + type Lister = Option<oio::PageLister<WebdavLister>>; type BlockingLister = (); fn info(&self) -> AccessorInfo { @@ -398,10 +398,9 @@ impl Accessor for WebdavBackend { let result: Multistatus = quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?; - Ok(( - RpList::default(), - Some(WebdavLister::new(&self.base_dir, &self.root, path, result)), - )) + let l = WebdavLister::new(&self.base_dir, &self.root, path, result); + + Ok((RpList::default(), Some(oio::PageLister::new(l)))) } StatusCode::NOT_FOUND if path.ends_with('/') => Ok((RpList::default(), None)), _ => Err(parse_error(resp).await?), diff --git a/core/src/services/webdav/lister.rs b/core/src/services/webdav/lister.rs index 84e97aca6..e98c77db3 100644 --- a/core/src/services/webdav/lister.rs +++ b/core/src/services/webdav/lister.rs @@ -15,8 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::mem; - use async_trait::async_trait; use serde::Deserialize; @@ -30,6 +28,7 @@ pub struct WebdavLister { } impl WebdavLister { + /// TODO: sending request in `next_page` instead of in `new`. pub fn new(base_dir: &str, root: &str, path: &str, multistates: Multistatus) -> Self { Self { base_dir: base_dir.to_string(), @@ -41,14 +40,10 @@ impl WebdavLister { } #[async_trait] -impl oio::List for WebdavLister { - async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { - if self.multistates.response.is_empty() { - return Ok(None); - }; - let oes = mem::take(&mut self.multistates.response); - - let mut entries = Vec::with_capacity(oes.len()); +impl oio::PageList for WebdavLister { + async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { + // Build request instead of clone here. + let oes = self.multistates.response.clone(); for res in oes { let path = res @@ -70,19 +65,20 @@ impl oio::List for WebdavLister { } let meta = res.parse_into_metadata()?; - entries.push(oio::Entry::new(&decoded_path, meta)) + ctx.entries.push_back(oio::Entry::new(&decoded_path, meta)) } + ctx.done = true; - Ok(Some(entries)) + Ok(()) } } -#[derive(Deserialize, Debug, PartialEq, Eq)] +#[derive(Deserialize, Debug, PartialEq, Eq, Clone)] pub struct Multistatus { pub response: Vec<ListOpResponse>, } -#[derive(Deserialize, Debug, PartialEq, Eq)] +#[derive(Deserialize, Debug, PartialEq, Eq, Clone)] pub struct ListOpResponse { pub href: String, pub propstat: Propstat, @@ -141,13 +137,13 @@ impl ListOpResponse { } } -#[derive(Deserialize, Debug, PartialEq, Eq)] +#[derive(Deserialize, Debug, PartialEq, Eq, Clone)] pub struct Propstat { pub prop: Prop, pub status: String, } -#[derive(Deserialize, Debug, PartialEq, Eq)] +#[derive(Deserialize, Debug, PartialEq, Eq, Clone)] pub struct Prop { #[serde(default)] pub displayname: String, @@ -158,13 +154,13 @@ pub struct Prop { pub resourcetype: ResourceTypeContainer, } -#[derive(Deserialize, Debug, PartialEq, Eq)] +#[derive(Deserialize, Debug, PartialEq, Eq, Clone)] pub struct ResourceTypeContainer { #[serde(rename = "$value")] pub value: Option<ResourceType>, } -#[derive(Deserialize, Debug, PartialEq, Eq)] +#[derive(Deserialize, Debug, PartialEq, Eq, Clone)] #[serde(rename_all = "lowercase")] pub enum ResourceType { Collection, diff --git a/core/src/services/webhdfs/backend.rs b/core/src/services/webhdfs/backend.rs index 3352e47d0..9fa9bd150 100644 --- a/core/src/services/webhdfs/backend.rs +++ b/core/src/services/webhdfs/backend.rs @@ -31,10 +31,8 @@ use super::error::parse_error; use super::error::parse_error_msg; use super::lister::WebhdfsLister; use super::message::BooleanResp; -use super::message::DirectoryListingWrapper; use super::message::FileStatusType; use super::message::FileStatusWrapper; -use super::message::FileStatusesWrapper; use super::writer::WebhdfsWriter; use crate::raw::*; use crate::*; @@ -277,7 +275,10 @@ impl WebhdfsBackend { Ok(req) } - fn webhdfs_list_status_request(&self, path: &str) -> Result<Request<AsyncBody>> { + pub async fn webhdfs_list_status_request( + &self, + path: &str, + ) -> Result<Response<IncomingAsyncBody>> { let p = build_abs_path(&self.root, path); let mut url = format!( "{}/webhdfs/v1/{}?op=LISTSTATUS", @@ -291,29 +292,24 @@ impl WebhdfsBackend { let req = Request::get(&url) .body(AsyncBody::Empty) .map_err(new_request_build_error)?; - Ok(req) + self.client.send(req).await } - pub(super) fn webhdfs_list_status_batch_request( + pub async fn webhdfs_list_status_batch_request( &self, path: &str, - args: &OpList, - ) -> Result<Request<AsyncBody>> { + start_after: &str, + ) -> Result<Response<IncomingAsyncBody>> { let p = build_abs_path(&self.root, path); - // if it's not the first time to call LISTSTATUS_BATCH, we will add &startAfter=<CHILD> - let start_after_param = match args.start_after() { - Some(sa) if sa.is_empty() => String::new(), - Some(sa) => format!("&startAfter={}", sa), - None => String::new(), - }; - let mut url = format!( - "{}/webhdfs/v1/{}?op=LISTSTATUS_BATCH{}", + "{}/webhdfs/v1/{}?op=LISTSTATUS_BATCH", self.endpoint, percent_encode_path(&p), - start_after_param ); + if !start_after.is_empty() { + url += format!("&startAfter={}", start_after).as_str(); + } if let Some(auth) = &self.auth { url += format!("&{auth}").as_str(); } @@ -321,7 +317,7 @@ impl WebhdfsBackend { let req = Request::get(&url) .body(AsyncBody::Empty) .map_err(new_request_build_error)?; - Ok(req) + self.client.send(req).await } async fn webhdfs_read_file( @@ -402,7 +398,7 @@ impl Accessor for WebhdfsBackend { type BlockingReader = (); type Writer = oio::OneShotWriter<WebhdfsWriter>; type BlockingWriter = (); - type Lister = WebhdfsLister; + type Lister = oio::PageLister<WebhdfsLister>; type BlockingLister = (); fn info(&self) -> AccessorInfo { @@ -549,46 +545,7 @@ impl Accessor for WebhdfsBackend { } let path = path.trim_end_matches('/'); - - if !self.disable_list_batch { - let req = self.webhdfs_list_status_batch_request(path, &OpList::default())?; - let resp = self.client.send(req).await?; - match resp.status() { - StatusCode::OK => { - let bs = resp.into_body().bytes().await?; - let directory_listing = serde_json::from_slice::<DirectoryListingWrapper>(&bs) - .map_err(new_json_deserialize_error)? - .directory_listing; - let file_statuses = directory_listing.partial_listing.file_statuses.file_status; - let mut objects = WebhdfsLister::new(self.clone(), path, file_statuses); - objects.set_remaining_entries(directory_listing.remaining_entries); - Ok((RpList::default(), objects)) - } - StatusCode::NOT_FOUND => { - let objects = WebhdfsLister::new(self.clone(), path, vec![]); - Ok((RpList::default(), objects)) - } - _ => Err(parse_error(resp).await?), - } - } else { - let req = self.webhdfs_list_status_request(path)?; - let resp = self.client.send(req).await?; - match resp.status() { - StatusCode::OK => { - let bs = resp.into_body().bytes().await?; - let file_statuses = serde_json::from_slice::<FileStatusesWrapper>(&bs) - .map_err(new_json_deserialize_error)? - .file_statuses - .file_status; - let objects = WebhdfsLister::new(self.clone(), path, file_statuses); - Ok((RpList::default(), objects)) - } - StatusCode::NOT_FOUND => { - let objects = WebhdfsLister::new(self.clone(), path, vec![]); - Ok((RpList::default(), objects)) - } - _ => Err(parse_error(resp).await?), - } - } + let l = WebhdfsLister::new(self.clone(), path); + Ok((RpList::default(), oio::PageLister::new(l))) } } diff --git a/core/src/services/webhdfs/lister.rs b/core/src/services/webhdfs/lister.rs index 8c6579a5d..0857414fc 100644 --- a/core/src/services/webhdfs/lister.rs +++ b/core/src/services/webhdfs/lister.rs @@ -20,90 +20,75 @@ use http::StatusCode; use super::backend::WebhdfsBackend; use super::error::parse_error; -use super::message::DirectoryListingWrapper; -use super::message::FileStatus; -use super::message::FileStatusType; +use super::message::*; use crate::raw::*; use crate::*; pub struct WebhdfsLister { backend: WebhdfsBackend, path: String, - statuses: Vec<FileStatus>, - batch_start_after: Option<String>, - remaining_entries: u32, } impl WebhdfsLister { - pub fn new(backend: WebhdfsBackend, path: &str, statuses: Vec<FileStatus>) -> Self { + pub fn new(backend: WebhdfsBackend, path: &str) -> Self { Self { backend, path: path.to_string(), - batch_start_after: statuses.last().map(|f| f.path_suffix.clone()), - statuses, - remaining_entries: 0, } } - - pub(super) fn set_remaining_entries(&mut self, remaining_entries: u32) { - self.remaining_entries = remaining_entries; - } } #[async_trait] -impl oio::List for WebhdfsLister { - /// Returns the next page of entries. - /// - /// Note: default list status with batch, calling next will query for next batch if `remaining_entries` > 0. - async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> { - if self.statuses.is_empty() && self.remaining_entries == 0 { - return Ok(None); - } +impl oio::PageList for WebhdfsLister { + async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> { + let file_status = if self.backend.disable_list_batch { + let resp = self.backend.webhdfs_list_status_request(&self.path).await?; + match resp.status() { + StatusCode::OK => { + ctx.done = true; - return match self.backend.disable_list_batch { - true => self.webhdfs_get_next_list_statuses(), - false => { - let args = OpList::with_start_after( - OpList::default(), - &self.batch_start_after.clone().unwrap(), - ); - let req = self - .backend - .webhdfs_list_status_batch_request(&self.path, &args)?; - let resp = self.backend.client.send(req).await?; + let bs = resp.into_body().bytes().await?; + serde_json::from_slice::<FileStatusesWrapper>(&bs) + .map_err(new_json_deserialize_error)? + .file_statuses + .file_status + } + StatusCode::NOT_FOUND => { + ctx.done = true; + return Ok(()); + } + _ => return Err(parse_error(resp).await?), + } + } else { + let resp = self + .backend + .webhdfs_list_status_batch_request(&self.path, &ctx.token) + .await?; + match resp.status() { + StatusCode::OK => { + let bs = resp.into_body().bytes().await?; + let directory_listing = serde_json::from_slice::<DirectoryListingWrapper>(&bs) + .map_err(new_json_deserialize_error)? + .directory_listing; + let file_statuses = directory_listing.partial_listing.file_statuses.file_status; - match resp.status() { - StatusCode::OK => { - let bs = resp.into_body().bytes().await?; - let directory_listing = - serde_json::from_slice::<DirectoryListingWrapper>(&bs) - .map_err(new_json_deserialize_error)?; - let file_statuses = directory_listing - .directory_listing - .partial_listing - .file_statuses - .file_status; - self.remaining_entries = - directory_listing.directory_listing.remaining_entries; - self.batch_start_after = - file_statuses.last().map(|f| f.path_suffix.clone()); - self.statuses.extend(file_statuses); - self.webhdfs_get_next_list_statuses() + if directory_listing.remaining_entries == 0 { + ctx.done = true; + } else if file_statuses.len() > 0 { + ctx.token = file_statuses.last().unwrap().path_suffix.clone(); } - StatusCode::NOT_FOUND => self.webhdfs_get_next_list_statuses(), - _ => Err(parse_error(resp).await?), + + file_statuses } + StatusCode::NOT_FOUND => { + ctx.done = true; + return Ok(()); + } + _ => return Err(parse_error(resp).await?), } }; - } -} -impl WebhdfsLister { - /// Returns the next page of entries. - fn webhdfs_get_next_list_statuses(&mut self) -> Result<Option<Vec<oio::Entry>>> { - let mut entries = Vec::with_capacity(self.statuses.len()); - - while let Some(status) = self.statuses.pop() { + for status in file_status { let mut path = if self.path.is_empty() { status.path_suffix.to_string() } else { @@ -126,8 +111,9 @@ impl WebhdfsLister { path += "/" } let entry = oio::Entry::new(&path, meta); - entries.push(entry); + ctx.entries.push_back(entry); } - Ok(Some(entries)) + + Ok(()) } } diff --git a/core/src/types/list.rs b/core/src/types/list.rs index ac01496b3..5493d58db 100644 --- a/core/src/types/list.rs +++ b/core/src/types/list.rs @@ -26,11 +26,10 @@ use futures::future::BoxFuture; use futures::FutureExt; use futures::Stream; +use crate::raw::oio::List; use crate::raw::*; use crate::*; -/// Future constructed by listing. -type ListFuture = BoxFuture<'static, (oio::Lister, Result<Option<Vec<oio::Entry>>>)>; /// Future constructed by stating. type StatFuture = BoxFuture<'static, (String, Result<RpStat>)>; @@ -42,12 +41,10 @@ type StatFuture = BoxFuture<'static, (String, Result<RpStat>)>; /// User can use lister as `Stream<Item = Result<Entry>>`. pub struct Lister { acc: FusedAccessor, + lister: oio::Lister, /// required_metakey is the metakey required by users. required_metakey: FlagSet<Metakey>, - buf: VecDeque<oio::Entry>, - lister: Option<oio::Lister>, - listing: Option<ListFuture>, stating: Option<StatFuture>, } @@ -64,11 +61,9 @@ impl Lister { Ok(Self { acc, + lister, required_metakey, - buf: VecDeque::new(), - lister: Some(lister), - listing: None, stating: None, }) } @@ -83,52 +78,39 @@ impl Stream for Lister { // Make sure we will not poll this future again. self.stating = None; - let metadata = rp?.into_metadata(); + // TODO: we should rebuild the future if stat failed. + let metadata = match rp { + Ok(rp) => rp.into_metadata(), + Err(err) => { + let acc = self.acc.clone(); + let fut = async move { + let res = acc.stat(&path, OpStat::default()).await; + + (path, res) + }; + self.stating = Some(Box::pin(fut)); + return Poll::Ready(Some(Err(err))); + } + }; return Poll::Ready(Some(Ok(Entry::new(path, metadata)))); } - if let Some(oe) = self.buf.pop_front() { - let (path, metadata) = oe.into_entry().into_parts(); - // TODO: we can optimize this by checking the provided metakey provided by services. - if metadata.contains_metakey(self.required_metakey) { - return Poll::Ready(Some(Ok(Entry::new(path, metadata)))); - } - - let acc = self.acc.clone(); - let fut = async move { - let res = acc.stat(&path, OpStat::default()).await; - - (path, res) - }; - self.stating = Some(Box::pin(fut)); - return self.poll_next(cx); - } - - if let Some(fut) = self.listing.as_mut() { - let (op, res) = ready!(fut.poll_unpin(cx)); - - // Make sure we will not poll this future again. - self.listing = None; - - return match res? { - Some(oes) => { - self.lister = Some(op); - self.buf = oes.into(); - self.poll_next(cx) + match ready!(self.lister.poll_next(cx))? { + Some(oe) => { + let (path, metadata) = oe.into_entry().into_parts(); + // TODO: we can optimize this by checking the provided metakey provided by services. + if metadata.contains_metakey(self.required_metakey) { + return Poll::Ready(Some(Ok(Entry::new(path, metadata)))); } - None => Poll::Ready(None), - }; - } - match self.lister.take() { - Some(mut lister) => { + let acc = self.acc.clone(); let fut = async move { - let res = lister.next().await; + let res = acc.stat(&path, OpStat::default()).await; - (lister, res) + (path, res) }; - self.listing = Some(Box::pin(fut)); + self.stating = Some(Box::pin(fut)); self.poll_next(cx) } None => Poll::Ready(None),
