This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch add-list-with-metakey-test in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 3eae5ee30b0ac8de345372304dc0be8531a3eac5 Author: Xuanwo <[email protected]> AuthorDate: Mon Sep 18 19:35:23 2023 +0800 Implement metakey for blocking lister Signed-off-by: Xuanwo <[email protected]> --- core/src/types/list.rs | 46 ++++++++++++++++++++++------ core/src/types/operator/blocking_operator.rs | 7 ++--- 2 files changed, 39 insertions(+), 14 deletions(-) diff --git a/core/src/types/list.rs b/core/src/types/list.rs index 566065a69..0c7a82d38 100644 --- a/core/src/types/list.rs +++ b/core/src/types/list.rs @@ -142,7 +142,11 @@ impl Stream for Lister { /// /// Users can construct Lister by `blocking_lister`. pub struct BlockingLister { - pager: oio::BlockingPager, + acc: FusedAccessor, + /// required_metakey is the metakey required by users. + required_metakey: FlagSet<Metakey>, + + pager: Option<oio::BlockingPager>, buf: VecDeque<oio::Entry>, } @@ -153,11 +157,17 @@ unsafe impl Sync for BlockingLister {} impl BlockingLister { /// Create a new lister. - pub(crate) fn new(pager: oio::BlockingPager) -> Self { - Self { - pager, - buf: VecDeque::default(), - } + pub(crate) fn create(acc: FusedAccessor, path: &str, args: OpList) -> Result<Self> { + let required_metakey = args.metakey(); + let (_, pager) = acc.blocking_list(path, args)?; + + Ok(Self { + acc, + required_metakey, + + buf: VecDeque::new(), + pager: Some(pager), + }) } } @@ -167,15 +177,33 @@ impl Iterator for BlockingLister { fn next(&mut self) -> Option<Self::Item> { if let Some(oe) = self.buf.pop_front() { - return Some(Ok(oe.into_entry())); + let (path, metadata) = oe.into_entry().into_parts(); + // TODO: we can optimize this by checking the provided metakey provided by services. + if metadata.contains_bit(self.required_metakey) { + return Some(Ok(Entry::new(path, metadata))); + } + + let metadata = match self.acc.blocking_stat(&path, OpStat::default()) { + Ok(rp) => rp.into_metadata(), + Err(err) => return Some(Err(err)), + }; + return Some(Ok(Entry::new(path, metadata))); } - self.buf = match self.pager.next() { + let pager = match self.pager.as_mut() { + Some(pager) => pager, + None => return None, + }; + + self.buf = match pager.next() { // Ideally, the convert from `Vec` to `VecDeque` will not do reallocation. // // However, this could be changed as described in [impl<T, A> From<Vec<T, A>> for VecDeque<T, A>](https://doc.rust-lang.org/std/collections/struct.VecDeque.html#impl-From%3CVec%3CT%2C%20A%3E%3E-for-VecDeque%3CT%2C%20A%3E) Ok(Some(entries)) => entries.into(), - Ok(None) => return None, + Ok(None) => { + self.pager = None; + return None; + } Err(err) => return Some(Err(err)), }; diff --git a/core/src/types/operator/blocking_operator.rs b/core/src/types/operator/blocking_operator.rs index 22a6208f1..1dea4b622 100644 --- a/core/src/types/operator/blocking_operator.rs +++ b/core/src/types/operator/blocking_operator.rs @@ -941,8 +941,7 @@ impl BlockingOperator { .with_context("path", &path)); } - let (_, pager) = inner.blocking_list(&path, args)?; - let lister = BlockingLister::new(pager); + let lister = BlockingLister::create(inner, &path, args)?; lister.collect() }, @@ -1116,9 +1115,7 @@ impl BlockingOperator { .with_context("path", &path)); } - let (_, pager) = inner.blocking_list(&path, args)?; - - Ok(BlockingLister::new(pager)) + BlockingLister::create(inner, &path, args) }, )) }
