This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch polish-lister in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit f87b2e67013ef776bb31464097860448618433cb Author: Xuanwo <[email protected]> AuthorDate: Wed Nov 15 22:59:25 2023 +0800 Cargo check Signed-off-by: Xuanwo <[email protected]> --- core/src/layers/retry.rs | 2 +- core/src/raw/adapters/typed_kv/backend.rs | 2 +- core/src/raw/oio/list/api.rs | 2 +- core/src/raw/oio/list/into_flat_page.rs | 4 ++-- core/src/raw/oio/list/page_list.rs | 11 +++++++++++ core/src/services/dbfs/lister.rs | 2 +- core/src/services/fs/backend.rs | 8 ++++---- core/src/services/fs/lister.rs | 4 +--- core/src/services/ftp/backend.rs | 4 ++-- core/src/services/ftp/lister.rs | 6 ++---- core/src/services/hdfs/backend.rs | 8 ++++---- core/src/services/hdfs/lister.rs | 6 ++---- core/src/services/ipmfs/lister.rs | 2 -- core/src/services/sftp/backend.rs | 4 ++-- core/src/services/sftp/lister.rs | 6 +----- core/src/services/swift/lister.rs | 2 +- 16 files changed, 36 insertions(+), 37 deletions(-) diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs index bd47598a1..bed437900 100644 --- a/core/src/layers/retry.rs +++ b/core/src/layers/retry.rs @@ -1316,7 +1316,7 @@ mod tests { } #[async_trait] impl oio::List for MockLister { - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { + fn poll_next(&mut self, _: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { self.attempt += 1; let result = match self.attempt { 1 => Err(Error::new( diff --git a/core/src/raw/adapters/typed_kv/backend.rs b/core/src/raw/adapters/typed_kv/backend.rs index 89c5b2ea6..bf59510d4 100644 --- a/core/src/raw/adapters/typed_kv/backend.rs +++ b/core/src/raw/adapters/typed_kv/backend.rs @@ -331,7 +331,7 @@ impl KvLister { #[async_trait] impl oio::List for KvLister { - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { + fn poll_next(&mut self, _: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { Poll::Ready(Ok(self.inner_next())) } } diff --git a/core/src/raw/oio/list/api.rs b/core/src/raw/oio/list/api.rs index b90f995cf..cb6ed0fa0 100644 --- a/core/src/raw/oio/list/api.rs +++ b/core/src/raw/oio/list/api.rs @@ -80,7 +80,7 @@ impl<P: List + ?Sized> List for Box<P> { #[async_trait] impl List for () { - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<Entry>>> { + fn poll_next(&mut self, _: &mut Context<'_>) -> Poll<Result<Option<Entry>>> { Poll::Ready(Ok(None)) } } diff --git a/core/src/raw/oio/list/into_flat_page.rs b/core/src/raw/oio/list/into_flat_page.rs index d9bfdf113..d335956a5 100644 --- a/core/src/raw/oio/list/into_flat_page.rs +++ b/core/src/raw/oio/list/into_flat_page.rs @@ -121,7 +121,7 @@ where continue; } - let (de, mut lister) = match self.active_lister.last_mut() { + let (de, lister) = match self.active_lister.last_mut() { Some((de, lister)) => (de, lister), None => return Poll::Ready(Ok(None)), }; @@ -167,7 +167,7 @@ where self.active_lister.push((Some(de), l)) } - let (de, mut lister) = match self.active_lister.last_mut() { + let (de, lister) = match self.active_lister.last_mut() { Some((de, lister)) => (de, lister), None => return Ok(None), }; diff --git a/core/src/raw/oio/list/page_list.rs b/core/src/raw/oio/list/page_list.rs index ed14a57d8..196f228b0 100644 --- a/core/src/raw/oio/list/page_list.rs +++ b/core/src/raw/oio/list/page_list.rs @@ -39,6 +39,16 @@ pub trait PageList: Send + Sync + Unpin + 'static { async fn next_page(&self, ctx: &mut PageContext) -> Result<()>; } +/// PageContext is the context passing between `PageList`. +/// +/// [`PageLister`] will init the PageContext, and implementor of [`PageList`] should fill the `PageContext` +/// based on their needs. +/// +/// - Set `done` to `true` if all page have been fetched. +/// - Update `token` if there is more page to fetch. `token` is not exposed to users, it's internal used only. +/// - Push back into the entries for each entry fetched from underlying storage. +/// +/// NOTE: `entries` is a `VecDeque` to avoid unnecessary memory allocation. Only `push_back` is allowed. pub struct PageContext { /// done is used to indicate whether the list operation is done. pub done: bool, @@ -52,6 +62,7 @@ pub struct PageContext { pub entries: VecDeque<oio::Entry>, } +/// PageLister implements [`List`] based on [`PageList`]. pub struct PageLister<L: PageList> { state: State<L>, } diff --git a/core/src/services/dbfs/lister.rs b/core/src/services/dbfs/lister.rs index 093b4c478..254afdc58 100644 --- a/core/src/services/dbfs/lister.rs +++ b/core/src/services/dbfs/lister.rs @@ -54,7 +54,7 @@ impl oio::PageList for DbfsLister { } let bytes = response.into_body().bytes().await?; - let mut decoded_response = + let decoded_response = serde_json::from_slice::<DbfsOutputList>(&bytes).map_err(new_json_deserialize_error)?; ctx.done = true; diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs index c3dad8bc5..23fb42b51 100644 --- a/core/src/services/fs/backend.rs +++ b/core/src/services/fs/backend.rs @@ -446,7 +446,7 @@ impl Accessor for FsBackend { } } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> { let p = self.root.join(path.trim_end_matches('/')); let f = match tokio::fs::read_dir(&p).await { @@ -460,7 +460,7 @@ impl Accessor for FsBackend { } }; - let rd = FsLister::new(&self.root, f, args.limit()); + let rd = FsLister::new(&self.root, f); Ok((RpList::default(), Some(rd))) } @@ -617,7 +617,7 @@ impl Accessor for FsBackend { } } - fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> { + fn blocking_list(&self, path: &str, _: OpList) -> Result<(RpList, Self::BlockingLister)> { let p = self.root.join(path.trim_end_matches('/')); let f = match std::fs::read_dir(p) { @@ -631,7 +631,7 @@ impl Accessor for FsBackend { } }; - let rd = FsLister::new(&self.root, f, args.limit()); + let rd = FsLister::new(&self.root, f); Ok((RpList::default(), Some(rd))) } diff --git a/core/src/services/fs/lister.rs b/core/src/services/fs/lister.rs index cce70f98c..0dd3dcf56 100644 --- a/core/src/services/fs/lister.rs +++ b/core/src/services/fs/lister.rs @@ -32,17 +32,15 @@ use crate::Result; pub struct FsLister<P> { root: PathBuf, - size: usize, rd: P, fut: Option<BoxFuture<'static, (tokio::fs::DirEntry, Result<FileType>)>>, } impl<P> FsLister<P> { - pub fn new(root: &Path, rd: P, limit: Option<usize>) -> Self { + pub fn new(root: &Path, rd: P) -> Self { Self { root: root.to_owned(), - size: limit.unwrap_or(1000), rd, fut: None, diff --git a/core/src/services/ftp/backend.rs b/core/src/services/ftp/backend.rs index 787da112f..f351dc19b 100644 --- a/core/src/services/ftp/backend.rs +++ b/core/src/services/ftp/backend.rs @@ -444,7 +444,7 @@ impl Accessor for FtpBackend { Ok(RpDelete::default()) } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> { let mut ftp_stream = self.ftp_connect(Operation::List).await?; let pathname = if path == "/" { None } else { Some(path) }; @@ -452,7 +452,7 @@ impl Accessor for FtpBackend { Ok(( RpList::default(), - FtpLister::new(if path == "/" { "" } else { path }, files, args.limit()), + FtpLister::new(if path == "/" { "" } else { path }, files), )) } } diff --git a/core/src/services/ftp/lister.rs b/core/src/services/ftp/lister.rs index 354bd4379..aa9a902bf 100644 --- a/core/src/services/ftp/lister.rs +++ b/core/src/services/ftp/lister.rs @@ -28,15 +28,13 @@ use crate::*; pub struct FtpLister { path: String, - size: usize, file_iter: IntoIter<String>, } impl FtpLister { - pub fn new(path: &str, files: Vec<String>, limit: Option<usize>) -> Self { + pub fn new(path: &str, files: Vec<String>) -> Self { Self { path: path.to_string(), - size: limit.unwrap_or(1000), file_iter: files.into_iter(), } } @@ -44,7 +42,7 @@ impl FtpLister { #[async_trait] impl oio::List for FtpLister { - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { + fn poll_next(&mut self, _: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { let de = match self.file_iter.next() { Some(file_str) => File::from_str(file_str.as_str()).map_err(|e| { Error::new(ErrorKind::Unexpected, "parse file from response").set_source(e) diff --git a/core/src/services/hdfs/backend.rs b/core/src/services/hdfs/backend.rs index acb8d452f..1fbe8568e 100644 --- a/core/src/services/hdfs/backend.rs +++ b/core/src/services/hdfs/backend.rs @@ -296,7 +296,7 @@ impl Accessor for HdfsBackend { Ok(RpDelete::default()) } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> { let p = build_rooted_abs_path(&self.root, path); let f = match self.client.read_dir(&p) { @@ -310,7 +310,7 @@ impl Accessor for HdfsBackend { } }; - let rd = HdfsLister::new(&self.root, f, args.limit()); + let rd = HdfsLister::new(&self.root, f); Ok((RpList::default(), Some(rd))) } @@ -413,7 +413,7 @@ impl Accessor for HdfsBackend { Ok(RpDelete::default()) } - fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> { + fn blocking_list(&self, path: &str, _: OpList) -> Result<(RpList, Self::BlockingLister)> { let p = build_rooted_abs_path(&self.root, path); let f = match self.client.read_dir(&p) { @@ -427,7 +427,7 @@ impl Accessor for HdfsBackend { } }; - let rd = HdfsLister::new(&self.root, f, args.limit()); + let rd = HdfsLister::new(&self.root, f); Ok((RpList::default(), Some(rd))) } diff --git a/core/src/services/hdfs/lister.rs b/core/src/services/hdfs/lister.rs index 7a6f7788a..50ba0ba4e 100644 --- a/core/src/services/hdfs/lister.rs +++ b/core/src/services/hdfs/lister.rs @@ -26,16 +26,14 @@ use crate::Result; pub struct HdfsLister { root: String, - size: usize, rd: hdrs::Readdir, } impl HdfsLister { - pub fn new(root: &str, rd: hdrs::Readdir, limit: Option<usize>) -> Self { + pub fn new(root: &str, rd: hdrs::Readdir) -> Self { Self { root: root.to_string(), - size: limit.unwrap_or(1000), rd, } } @@ -43,7 +41,7 @@ impl HdfsLister { #[async_trait] impl oio::List for HdfsLister { - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { + fn poll_next(&mut self, _: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { let de = match self.rd.next() { Some(de) => de, None => return Poll::Ready(Ok(None)), diff --git a/core/src/services/ipmfs/lister.rs b/core/src/services/ipmfs/lister.rs index 902d32592..9198b8e31 100644 --- a/core/src/services/ipmfs/lister.rs +++ b/core/src/services/ipmfs/lister.rs @@ -32,7 +32,6 @@ pub struct IpmfsLister { backend: Arc<IpmfsBackend>, root: String, path: String, - consumed: bool, } impl IpmfsLister { @@ -41,7 +40,6 @@ impl IpmfsLister { backend, root: root.to_string(), path: path.to_string(), - consumed: false, } } } diff --git a/core/src/services/sftp/backend.rs b/core/src/services/sftp/backend.rs index 76f792e19..47d680ef2 100644 --- a/core/src/services/sftp/backend.rs +++ b/core/src/services/sftp/backend.rs @@ -446,7 +446,7 @@ impl Accessor for SftpBackend { Ok(RpDelete::default()) } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> { let client = self.connect().await?; let mut fs = client.fs(); fs.set_cwd(&self.root); @@ -467,7 +467,7 @@ impl Accessor for SftpBackend { Ok(( RpList::default(), - Some(SftpLister::new(dir, path.to_owned(), args.limit())), + Some(SftpLister::new(dir, path.to_owned())), )) } } diff --git a/core/src/services/sftp/lister.rs b/core/src/services/sftp/lister.rs index e69f92213..eeac58af8 100644 --- a/core/src/services/sftp/lister.rs +++ b/core/src/services/sftp/lister.rs @@ -29,19 +29,15 @@ use crate::Result; pub struct SftpLister { dir: Pin<Box<ReadDir>>, prefix: String, - limit: usize, } impl SftpLister { - pub fn new(dir: ReadDir, path: String, limit: Option<usize>) -> Self { + pub fn new(dir: ReadDir, path: String) -> Self { let prefix = if path == "/" { "".to_owned() } else { path }; - let limit = limit.unwrap_or(usize::MAX); - SftpLister { dir: Box::pin(dir), prefix, - limit, } } } diff --git a/core/src/services/swift/lister.rs b/core/src/services/swift/lister.rs index 7fdfa0d5b..6291c866d 100644 --- a/core/src/services/swift/lister.rs +++ b/core/src/services/swift/lister.rs @@ -57,7 +57,7 @@ impl oio::PageList for SwiftLister { ctx.done = true; let bytes = response.into_body().bytes().await?; - let mut decoded_response = serde_json::from_slice::<Vec<ListOpResponse>>(&bytes) + let decoded_response = serde_json::from_slice::<Vec<ListOpResponse>>(&bytes) .map_err(new_json_deserialize_error)?; for status in decoded_response {
