This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch list-prefix
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit cea1f99811e3329dc1545b3f16624d563b494c02
Author: Xuanwo <[email protected]>
AuthorDate: Fri Dec 8 17:27:49 2023 +0800

    feat: Add list prefix support
    
    Signed-off-by: Xuanwo <[email protected]>
---
 bindings/nodejs/src/lib.rs            |  5 +--
 core/src/layers/complete.rs           | 48 +++++++++++++++-----
 core/src/raw/oio/list/flat_list.rs    |  2 +-
 core/src/raw/oio/list/mod.rs          |  3 ++
 core/src/raw/oio/list/prefix_list.rs  | 82 +++++++++++++++++++++++++++++++++++
 core/src/services/dashmap/backend.rs  |  3 +-
 core/src/services/dropbox/core.rs     |  5 ++-
 core/src/services/huggingface/core.rs |  3 +-
 core/src/types/list.rs                |  4 +-
 9 files changed, 135 insertions(+), 20 deletions(-)

diff --git a/bindings/nodejs/src/lib.rs b/bindings/nodejs/src/lib.rs
index bd3525bfc..ee05ee2cf 100644
--- a/bindings/nodejs/src/lib.rs
+++ b/bindings/nodejs/src/lib.rs
@@ -24,11 +24,10 @@ use std::collections::HashMap;
 use std::str::FromStr;
 use std::time::Duration;
 
-use opendal::raw::oio::BlockingRead;
-use opendal::raw::oio::ReadExt;
-
 use futures::TryStreamExt;
 use napi::bindgen_prelude::*;
+use opendal::raw::oio::BlockingRead;
+use opendal::raw::oio::ReadExt;
 
 #[napi]
 pub struct Operator(opendal::Operator);
diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index 278a9e827..d41a4d6e0 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -30,6 +30,7 @@ use bytes::Bytes;
 use crate::raw::oio::FileReader;
 use crate::raw::oio::FlatLister;
 use crate::raw::oio::LazyReader;
+use crate::raw::oio::PrefixLister;
 use crate::raw::oio::RangeReader;
 use crate::raw::oio::StreamableReader;
 use crate::raw::*;
@@ -105,10 +106,8 @@ use crate::*;
 /// Underlying services will return [`Capability`] to indicate the
 /// features that returning listers support.
 ///
-/// - If both `list_without_recursive` and `list_with_recursive`, return 
directly.
-/// - If only `list_with_recursive`, with [`oio::to_flat_lister`].
-/// - if only `list_without_recursive`, with [`oio::to_hierarchy_lister`].
-/// - If neither not supported, something must be wrong for `list` is true.
+/// - If support `list_with_recursive`, return directly.
+/// - if not, wrap with [`FlatLister`].
 ///
 /// ## Capability Check
 ///
@@ -366,9 +365,8 @@ impl<A: Accessor> CompleteAccessor<A> {
         let recursive = args.recursive();
 
         match (recursive, cap.list_with_recursive) {
-            // - If service can list_with_recursive
-            // - If recursive is false
-            (_, true) | (false, _) => {
+            // - If service can list_with_recursive, we can forward list to it 
directly.
+            (_, true) => {
                 let (rp, p) = self.inner.list(path, args).await?;
                 Ok((rp, CompleteLister::AlreadyComplete(p)))
             }
@@ -377,6 +375,20 @@ impl<A: Accessor> CompleteAccessor<A> {
                 let p = FlatLister::new(self.inner.clone(), path);
                 Ok((RpList::default(), CompleteLister::NeedFlat(p)))
             }
+            // If recursive and service doesn't support list_with_recursive, 
we need to handle
+            // list prefix by ourselves.
+            (false, false) => {
+                // Forward path that ends with /
+                if path.ends_with('/') {
+                    let (rp, p) = self.inner.list(path, args).await?;
+                    Ok((rp, CompleteLister::AlreadyComplete(p)))
+                } else {
+                    let parent = get_parent(path);
+                    let (rp, p) = self.inner.list(parent, args).await?;
+                    let p = PrefixLister::new(p, path);
+                    Ok((rp, CompleteLister::NeedPrefix(p)))
+                }
+            }
         }
     }
 
@@ -393,9 +405,8 @@ impl<A: Accessor> CompleteAccessor<A> {
         let recursive = args.recursive();
 
         match (recursive, cap.list_with_recursive) {
-            // - If service can both list_with_recursive
-            // - If recursive is false
-            (_, true) | (false, _) => {
+            // - If service can list_with_recursive, we can forward list to it 
directly.
+            (_, true) => {
                 let (rp, p) = self.inner.blocking_list(path, args)?;
                 Ok((rp, CompleteLister::AlreadyComplete(p)))
             }
@@ -404,6 +415,20 @@ impl<A: Accessor> CompleteAccessor<A> {
                 let p = FlatLister::new(self.inner.clone(), path);
                 Ok((RpList::default(), CompleteLister::NeedFlat(p)))
             }
+            // If recursive and service doesn't support list_with_recursive, 
we need to handle
+            // list prefix by ourselves.
+            (false, false) => {
+                // Forward path that ends with /
+                if path.ends_with('/') {
+                    let (rp, p) = self.inner.blocking_list(path, args)?;
+                    Ok((rp, CompleteLister::AlreadyComplete(p)))
+                } else {
+                    let parent = get_parent(path);
+                    let (rp, p) = self.inner.blocking_list(parent, args)?;
+                    let p = PrefixLister::new(p, path);
+                    Ok((rp, CompleteLister::NeedPrefix(p)))
+                }
+            }
         }
     }
 }
@@ -706,6 +731,7 @@ where
 pub enum CompleteLister<A: Accessor, P> {
     AlreadyComplete(P),
     NeedFlat(FlatLister<Arc<A>, P>),
+    NeedPrefix(PrefixLister<P>),
 }
 
 #[async_trait]
@@ -720,6 +746,7 @@ where
         match self {
             AlreadyComplete(p) => p.poll_next(cx),
             NeedFlat(p) => p.poll_next(cx),
+            NeedPrefix(p) => p.poll_next(cx),
         }
     }
 }
@@ -735,6 +762,7 @@ where
         match self {
             AlreadyComplete(p) => p.next(),
             NeedFlat(p) => p.next(),
+            NeedPrefix(p) => p.next(),
         }
     }
 }
diff --git a/core/src/raw/oio/list/flat_list.rs 
b/core/src/raw/oio/list/flat_list.rs
index 9e8e2a5ac..c574d064e 100644
--- a/core/src/raw/oio/list/flat_list.rs
+++ b/core/src/raw/oio/list/flat_list.rs
@@ -28,7 +28,7 @@ use crate::*;
 /// ListFuture is the future returned while calling async list.
 type ListFuture<A, L> = BoxFuture<'static, (A, oio::Entry, Result<(RpList, 
L)>)>;
 
-/// ToFlatLister will walk dir in bottom up way:
+/// FlatLister will walk dir in bottom up way:
 ///
 /// - List nested dir first
 /// - Go back into parent dirs one by one
diff --git a/core/src/raw/oio/list/mod.rs b/core/src/raw/oio/list/mod.rs
index 62c52de3c..31f70d6f2 100644
--- a/core/src/raw/oio/list/mod.rs
+++ b/core/src/raw/oio/list/mod.rs
@@ -33,3 +33,6 @@ pub use flat_list::FlatLister;
 
 mod hierarchy_list;
 pub use hierarchy_list::HierarchyLister;
+
+mod prefix_list;
+pub use prefix_list::PrefixLister;
diff --git a/core/src/raw/oio/list/prefix_list.rs 
b/core/src/raw/oio/list/prefix_list.rs
new file mode 100644
index 000000000..f176e9bb9
--- /dev/null
+++ b/core/src/raw/oio/list/prefix_list.rs
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::task::ready;
+use std::task::Context;
+use std::task::Poll;
+
+use crate::raw::*;
+use crate::*;
+
+/// PrefixLister is used to filter entries by prefix.
+///
+/// For example, if we have a lister that returns entries:
+///
+/// ```txt
+/// .
+/// ├── file_a
+/// └── file_b
+/// ```
+///
+/// We can use `PrefixLister` to filter entries with prefix `file_`.
+pub struct PrefixLister<L> {
+    lister: L,
+    prefix: String,
+}
+
+/// # Safety
+///
+/// We will only take `&mut Self` reference for FsLister.
+unsafe impl<L> Sync for PrefixLister<L> {}
+
+impl<L> PrefixLister<L> {
+    /// Create a new flat lister
+    pub fn new(lister: L, prefix: &str) -> PrefixLister<L> {
+        PrefixLister {
+            lister,
+            prefix: prefix.to_string(),
+        }
+    }
+}
+
+impl<L> oio::List for PrefixLister<L>
+where
+    L: oio::List,
+{
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Result<Option<oio::Entry>>> {
+        loop {
+            match ready!(self.lister.poll_next(cx)) {
+                Ok(Some(e)) if !e.path().starts_with(&self.prefix) => continue,
+                v => return Poll::Ready(v),
+            }
+        }
+    }
+}
+
+impl<L> oio::BlockingList for PrefixLister<L>
+where
+    L: oio::BlockingList,
+{
+    fn next(&mut self) -> Result<Option<oio::Entry>> {
+        loop {
+            match self.lister.next() {
+                Ok(Some(e)) if !e.path().starts_with(&self.prefix) => continue,
+                v => return v,
+            }
+        }
+    }
+}
diff --git a/core/src/services/dashmap/backend.rs 
b/core/src/services/dashmap/backend.rs
index b85b528e2..f626bcae2 100644
--- a/core/src/services/dashmap/backend.rs
+++ b/core/src/services/dashmap/backend.rs
@@ -16,7 +16,8 @@
 // under the License.
 
 use std::collections::HashMap;
-use std::fmt::{Debug, Formatter};
+use std::fmt::Debug;
+use std::fmt::Formatter;
 
 use async_trait::async_trait;
 use dashmap::DashMap;
diff --git a/core/src/services/dropbox/core.rs 
b/core/src/services/dropbox/core.rs
index dbc53f69d..3fff5115d 100644
--- a/core/src/services/dropbox/core.rs
+++ b/core/src/services/dropbox/core.rs
@@ -15,13 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use backon::ExponentialBuilder;
 use std::default::Default;
 use std::fmt::Debug;
 use std::fmt::Formatter;
 use std::sync::Arc;
 use std::time::Duration;
 
+use backon::ExponentialBuilder;
 use bytes::Bytes;
 use chrono::DateTime;
 use chrono::Utc;
@@ -36,7 +36,8 @@ use serde::Deserialize;
 use serde::Serialize;
 use tokio::sync::Mutex;
 
-use super::error::{parse_error, DropboxErrorResponse};
+use super::error::parse_error;
+use super::error::DropboxErrorResponse;
 use crate::raw::*;
 use crate::*;
 
diff --git a/core/src/services/huggingface/core.rs 
b/core/src/services/huggingface/core.rs
index 67963082e..768b41600 100644
--- a/core/src/services/huggingface/core.rs
+++ b/core/src/services/huggingface/core.rs
@@ -234,10 +234,11 @@ pub(super) struct HuggingfaceImport {
 
 #[cfg(test)]
 mod tests {
+    use bytes::Bytes;
+
     use super::*;
     use crate::raw::new_json_deserialize_error;
     use crate::types::Result;
-    use bytes::Bytes;
 
     #[test]
     fn parse_list_response_test() -> Result<()> {
diff --git a/core/src/types/list.rs b/core/src/types/list.rs
index bc2e75120..d9140c9da 100644
--- a/core/src/types/list.rs
+++ b/core/src/types/list.rs
@@ -40,7 +40,6 @@ use crate::*;
 ///
 /// - Lister implements `Stream<Item = Result<Entry>>`.
 /// - Lister will return `None` if there is no more entries or error has been 
returned.
-///
 pub struct Lister {
     acc: FusedAccessor,
     lister: Option<oio::Lister>,
@@ -85,8 +84,9 @@ pub struct Lister {
 ///
 /// ```rust
 /// use std::mem::size_of;
-/// use opendal::Result;
+///
 /// use opendal::Entry;
+/// use opendal::Result;
 ///
 /// assert_eq!(264, size_of::<(String, Result<opendal::raw::RpStat>)>());
 /// assert_eq!(264, size_of::<Option<Entry>>());

Reply via email to