This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch list-prefix in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit cea1f99811e3329dc1545b3f16624d563b494c02 Author: Xuanwo <[email protected]> AuthorDate: Fri Dec 8 17:27:49 2023 +0800 feat: Add list prefix support Signed-off-by: Xuanwo <[email protected]> --- bindings/nodejs/src/lib.rs | 5 +-- core/src/layers/complete.rs | 48 +++++++++++++++----- core/src/raw/oio/list/flat_list.rs | 2 +- core/src/raw/oio/list/mod.rs | 3 ++ core/src/raw/oio/list/prefix_list.rs | 82 +++++++++++++++++++++++++++++++++++ core/src/services/dashmap/backend.rs | 3 +- core/src/services/dropbox/core.rs | 5 ++- core/src/services/huggingface/core.rs | 3 +- core/src/types/list.rs | 4 +- 9 files changed, 135 insertions(+), 20 deletions(-) diff --git a/bindings/nodejs/src/lib.rs b/bindings/nodejs/src/lib.rs index bd3525bfc..ee05ee2cf 100644 --- a/bindings/nodejs/src/lib.rs +++ b/bindings/nodejs/src/lib.rs @@ -24,11 +24,10 @@ use std::collections::HashMap; use std::str::FromStr; use std::time::Duration; -use opendal::raw::oio::BlockingRead; -use opendal::raw::oio::ReadExt; - use futures::TryStreamExt; use napi::bindgen_prelude::*; +use opendal::raw::oio::BlockingRead; +use opendal::raw::oio::ReadExt; #[napi] pub struct Operator(opendal::Operator); diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index 278a9e827..d41a4d6e0 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -30,6 +30,7 @@ use bytes::Bytes; use crate::raw::oio::FileReader; use crate::raw::oio::FlatLister; use crate::raw::oio::LazyReader; +use crate::raw::oio::PrefixLister; use crate::raw::oio::RangeReader; use crate::raw::oio::StreamableReader; use crate::raw::*; @@ -105,10 +106,8 @@ use crate::*; /// Underlying services will return [`Capability`] to indicate the /// features that returning listers support. /// -/// - If both `list_without_recursive` and `list_with_recursive`, return directly. -/// - If only `list_with_recursive`, with [`oio::to_flat_lister`]. -/// - if only `list_without_recursive`, with [`oio::to_hierarchy_lister`]. -/// - If neither not supported, something must be wrong for `list` is true. +/// - If support `list_with_recursive`, return directly. +/// - if not, wrap with [`FlatLister`]. /// /// ## Capability Check /// @@ -366,9 +365,8 @@ impl<A: Accessor> CompleteAccessor<A> { let recursive = args.recursive(); match (recursive, cap.list_with_recursive) { - // - If service can list_with_recursive - // - If recursive is false - (_, true) | (false, _) => { + // - If service can list_with_recursive, we can forward list to it directly. + (_, true) => { let (rp, p) = self.inner.list(path, args).await?; Ok((rp, CompleteLister::AlreadyComplete(p))) } @@ -377,6 +375,20 @@ impl<A: Accessor> CompleteAccessor<A> { let p = FlatLister::new(self.inner.clone(), path); Ok((RpList::default(), CompleteLister::NeedFlat(p))) } + // If recursive and service doesn't support list_with_recursive, we need to handle + // list prefix by ourselves. + (false, false) => { + // Forward path that ends with / + if path.ends_with('/') { + let (rp, p) = self.inner.list(path, args).await?; + Ok((rp, CompleteLister::AlreadyComplete(p))) + } else { + let parent = get_parent(path); + let (rp, p) = self.inner.list(parent, args).await?; + let p = PrefixLister::new(p, path); + Ok((rp, CompleteLister::NeedPrefix(p))) + } + } } } @@ -393,9 +405,8 @@ impl<A: Accessor> CompleteAccessor<A> { let recursive = args.recursive(); match (recursive, cap.list_with_recursive) { - // - If service can both list_with_recursive - // - If recursive is false - (_, true) | (false, _) => { + // - If service can list_with_recursive, we can forward list to it directly. + (_, true) => { let (rp, p) = self.inner.blocking_list(path, args)?; Ok((rp, CompleteLister::AlreadyComplete(p))) } @@ -404,6 +415,20 @@ impl<A: Accessor> CompleteAccessor<A> { let p = FlatLister::new(self.inner.clone(), path); Ok((RpList::default(), CompleteLister::NeedFlat(p))) } + // If recursive and service doesn't support list_with_recursive, we need to handle + // list prefix by ourselves. + (false, false) => { + // Forward path that ends with / + if path.ends_with('/') { + let (rp, p) = self.inner.blocking_list(path, args)?; + Ok((rp, CompleteLister::AlreadyComplete(p))) + } else { + let parent = get_parent(path); + let (rp, p) = self.inner.blocking_list(parent, args)?; + let p = PrefixLister::new(p, path); + Ok((rp, CompleteLister::NeedPrefix(p))) + } + } } } } @@ -706,6 +731,7 @@ where pub enum CompleteLister<A: Accessor, P> { AlreadyComplete(P), NeedFlat(FlatLister<Arc<A>, P>), + NeedPrefix(PrefixLister<P>), } #[async_trait] @@ -720,6 +746,7 @@ where match self { AlreadyComplete(p) => p.poll_next(cx), NeedFlat(p) => p.poll_next(cx), + NeedPrefix(p) => p.poll_next(cx), } } } @@ -735,6 +762,7 @@ where match self { AlreadyComplete(p) => p.next(), NeedFlat(p) => p.next(), + NeedPrefix(p) => p.next(), } } } diff --git a/core/src/raw/oio/list/flat_list.rs b/core/src/raw/oio/list/flat_list.rs index 9e8e2a5ac..c574d064e 100644 --- a/core/src/raw/oio/list/flat_list.rs +++ b/core/src/raw/oio/list/flat_list.rs @@ -28,7 +28,7 @@ use crate::*; /// ListFuture is the future returned while calling async list. type ListFuture<A, L> = BoxFuture<'static, (A, oio::Entry, Result<(RpList, L)>)>; -/// ToFlatLister will walk dir in bottom up way: +/// FlatLister will walk dir in bottom up way: /// /// - List nested dir first /// - Go back into parent dirs one by one diff --git a/core/src/raw/oio/list/mod.rs b/core/src/raw/oio/list/mod.rs index 62c52de3c..31f70d6f2 100644 --- a/core/src/raw/oio/list/mod.rs +++ b/core/src/raw/oio/list/mod.rs @@ -33,3 +33,6 @@ pub use flat_list::FlatLister; mod hierarchy_list; pub use hierarchy_list::HierarchyLister; + +mod prefix_list; +pub use prefix_list::PrefixLister; diff --git a/core/src/raw/oio/list/prefix_list.rs b/core/src/raw/oio/list/prefix_list.rs new file mode 100644 index 000000000..f176e9bb9 --- /dev/null +++ b/core/src/raw/oio/list/prefix_list.rs @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::task::ready; +use std::task::Context; +use std::task::Poll; + +use crate::raw::*; +use crate::*; + +/// PrefixLister is used to filter entries by prefix. +/// +/// For example, if we have a lister that returns entries: +/// +/// ```txt +/// . +/// ├── file_a +/// └── file_b +/// ``` +/// +/// We can use `PrefixLister` to filter entries with prefix `file_`. +pub struct PrefixLister<L> { + lister: L, + prefix: String, +} + +/// # Safety +/// +/// We will only take `&mut Self` reference for FsLister. +unsafe impl<L> Sync for PrefixLister<L> {} + +impl<L> PrefixLister<L> { + /// Create a new flat lister + pub fn new(lister: L, prefix: &str) -> PrefixLister<L> { + PrefixLister { + lister, + prefix: prefix.to_string(), + } + } +} + +impl<L> oio::List for PrefixLister<L> +where + L: oio::List, +{ + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<oio::Entry>>> { + loop { + match ready!(self.lister.poll_next(cx)) { + Ok(Some(e)) if !e.path().starts_with(&self.prefix) => continue, + v => return Poll::Ready(v), + } + } + } +} + +impl<L> oio::BlockingList for PrefixLister<L> +where + L: oio::BlockingList, +{ + fn next(&mut self) -> Result<Option<oio::Entry>> { + loop { + match self.lister.next() { + Ok(Some(e)) if !e.path().starts_with(&self.prefix) => continue, + v => return v, + } + } + } +} diff --git a/core/src/services/dashmap/backend.rs b/core/src/services/dashmap/backend.rs index b85b528e2..f626bcae2 100644 --- a/core/src/services/dashmap/backend.rs +++ b/core/src/services/dashmap/backend.rs @@ -16,7 +16,8 @@ // under the License. use std::collections::HashMap; -use std::fmt::{Debug, Formatter}; +use std::fmt::Debug; +use std::fmt::Formatter; use async_trait::async_trait; use dashmap::DashMap; diff --git a/core/src/services/dropbox/core.rs b/core/src/services/dropbox/core.rs index dbc53f69d..3fff5115d 100644 --- a/core/src/services/dropbox/core.rs +++ b/core/src/services/dropbox/core.rs @@ -15,13 +15,13 @@ // specific language governing permissions and limitations // under the License. -use backon::ExponentialBuilder; use std::default::Default; use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; use std::time::Duration; +use backon::ExponentialBuilder; use bytes::Bytes; use chrono::DateTime; use chrono::Utc; @@ -36,7 +36,8 @@ use serde::Deserialize; use serde::Serialize; use tokio::sync::Mutex; -use super::error::{parse_error, DropboxErrorResponse}; +use super::error::parse_error; +use super::error::DropboxErrorResponse; use crate::raw::*; use crate::*; diff --git a/core/src/services/huggingface/core.rs b/core/src/services/huggingface/core.rs index 67963082e..768b41600 100644 --- a/core/src/services/huggingface/core.rs +++ b/core/src/services/huggingface/core.rs @@ -234,10 +234,11 @@ pub(super) struct HuggingfaceImport { #[cfg(test)] mod tests { + use bytes::Bytes; + use super::*; use crate::raw::new_json_deserialize_error; use crate::types::Result; - use bytes::Bytes; #[test] fn parse_list_response_test() -> Result<()> { diff --git a/core/src/types/list.rs b/core/src/types/list.rs index bc2e75120..d9140c9da 100644 --- a/core/src/types/list.rs +++ b/core/src/types/list.rs @@ -40,7 +40,6 @@ use crate::*; /// /// - Lister implements `Stream<Item = Result<Entry>>`. /// - Lister will return `None` if there is no more entries or error has been returned. -/// pub struct Lister { acc: FusedAccessor, lister: Option<oio::Lister>, @@ -85,8 +84,9 @@ pub struct Lister { /// /// ```rust /// use std::mem::size_of; -/// use opendal::Result; +/// /// use opendal::Entry; +/// use opendal::Result; /// /// assert_eq!(264, size_of::<(String, Result<opendal::raw::RpStat>)>()); /// assert_eq!(264, size_of::<Option<Entry>>());
