This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 9a222e4d7 feat: Add list prefix support (#3728)
9a222e4d7 is described below

commit 9a222e4d72b328a24d5775b1565292f4636bbe69
Author: Xuanwo <[email protected]>
AuthorDate: Fri Dec 8 17:55:50 2023 +0800

    feat: Add list prefix support (#3728)
    
    * Remove limit on list prefix
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Cover more test
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * feat: Add list prefix support
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Add upgrade
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
---
 bindings/nodejs/src/lib.rs                   |  5 +-
 core/src/docs/upgrade.md                     |  4 ++
 core/src/layers/complete.rs                  | 48 ++++++++++++----
 core/src/raw/oio/list/flat_list.rs           |  2 +-
 core/src/raw/oio/list/mod.rs                 |  3 +
 core/src/raw/oio/list/prefix_list.rs         | 82 ++++++++++++++++++++++++++++
 core/src/services/dashmap/backend.rs         |  3 +-
 core/src/services/dropbox/core.rs            |  5 +-
 core/src/services/huggingface/core.rs        |  3 +-
 core/src/types/list.rs                       |  4 +-
 core/src/types/operator/blocking_operator.rs | 24 +-------
 core/src/types/operator/operator.rs          | 24 +-------
 core/tests/behavior/list.rs                  | 39 ++++++++++---
 13 files changed, 173 insertions(+), 73 deletions(-)

diff --git a/bindings/nodejs/src/lib.rs b/bindings/nodejs/src/lib.rs
index bd3525bfc..ee05ee2cf 100644
--- a/bindings/nodejs/src/lib.rs
+++ b/bindings/nodejs/src/lib.rs
@@ -24,11 +24,10 @@ use std::collections::HashMap;
 use std::str::FromStr;
 use std::time::Duration;
 
-use opendal::raw::oio::BlockingRead;
-use opendal::raw::oio::ReadExt;
-
 use futures::TryStreamExt;
 use napi::bindgen_prelude::*;
+use opendal::raw::oio::BlockingRead;
+use opendal::raw::oio::ReadExt;
 
 #[napi]
 pub struct Operator(opendal::Operator);
diff --git a/core/src/docs/upgrade.md b/core/src/docs/upgrade.md
index f769cca2f..d5d8c9d7d 100644
--- a/core/src/docs/upgrade.md
+++ b/core/src/docs/upgrade.md
@@ -6,6 +6,10 @@
 
 - The `thread_pool_enabled` option has been removed.
 
+### List Prefix Supported
+
+After [RFC: List Prefix](crate::docs::rfcs::rfc_3243_list_prefix) landed, we 
have changed the behavior of `list` a path without `/`. OpenDAL used to return 
`NotADirectory` error, but now we will return the list of entries that start 
with given prefix instead.
+
 # Upgrade to v0.43
 
 ## Public API
diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index 278a9e827..d41a4d6e0 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -30,6 +30,7 @@ use bytes::Bytes;
 use crate::raw::oio::FileReader;
 use crate::raw::oio::FlatLister;
 use crate::raw::oio::LazyReader;
+use crate::raw::oio::PrefixLister;
 use crate::raw::oio::RangeReader;
 use crate::raw::oio::StreamableReader;
 use crate::raw::*;
@@ -105,10 +106,8 @@ use crate::*;
 /// Underlying services will return [`Capability`] to indicate the
 /// features that returning listers support.
 ///
-/// - If both `list_without_recursive` and `list_with_recursive`, return 
directly.
-/// - If only `list_with_recursive`, with [`oio::to_flat_lister`].
-/// - if only `list_without_recursive`, with [`oio::to_hierarchy_lister`].
-/// - If neither not supported, something must be wrong for `list` is true.
+/// - If support `list_with_recursive`, return directly.
+/// - if not, wrap with [`FlatLister`].
 ///
 /// ## Capability Check
 ///
@@ -366,9 +365,8 @@ impl<A: Accessor> CompleteAccessor<A> {
         let recursive = args.recursive();
 
         match (recursive, cap.list_with_recursive) {
-            // - If service can list_with_recursive
-            // - If recursive is false
-            (_, true) | (false, _) => {
+            // - If service can list_with_recursive, we can forward list to it 
directly.
+            (_, true) => {
                 let (rp, p) = self.inner.list(path, args).await?;
                 Ok((rp, CompleteLister::AlreadyComplete(p)))
             }
@@ -377,6 +375,20 @@ impl<A: Accessor> CompleteAccessor<A> {
                 let p = FlatLister::new(self.inner.clone(), path);
                 Ok((RpList::default(), CompleteLister::NeedFlat(p)))
             }
+            // If recursive and service doesn't support list_with_recursive, 
we need to handle
+            // list prefix by ourselves.
+            (false, false) => {
+                // Forward path that ends with /
+                if path.ends_with('/') {
+                    let (rp, p) = self.inner.list(path, args).await?;
+                    Ok((rp, CompleteLister::AlreadyComplete(p)))
+                } else {
+                    let parent = get_parent(path);
+                    let (rp, p) = self.inner.list(parent, args).await?;
+                    let p = PrefixLister::new(p, path);
+                    Ok((rp, CompleteLister::NeedPrefix(p)))
+                }
+            }
         }
     }
 
@@ -393,9 +405,8 @@ impl<A: Accessor> CompleteAccessor<A> {
         let recursive = args.recursive();
 
         match (recursive, cap.list_with_recursive) {
-            // - If service can both list_with_recursive
-            // - If recursive is false
-            (_, true) | (false, _) => {
+            // - If service can list_with_recursive, we can forward list to it 
directly.
+            (_, true) => {
                 let (rp, p) = self.inner.blocking_list(path, args)?;
                 Ok((rp, CompleteLister::AlreadyComplete(p)))
             }
@@ -404,6 +415,20 @@ impl<A: Accessor> CompleteAccessor<A> {
                 let p = FlatLister::new(self.inner.clone(), path);
                 Ok((RpList::default(), CompleteLister::NeedFlat(p)))
             }
+            // If recursive and service doesn't support list_with_recursive, 
we need to handle
+            // list prefix by ourselves.
+            (false, false) => {
+                // Forward path that ends with /
+                if path.ends_with('/') {
+                    let (rp, p) = self.inner.blocking_list(path, args)?;
+                    Ok((rp, CompleteLister::AlreadyComplete(p)))
+                } else {
+                    let parent = get_parent(path);
+                    let (rp, p) = self.inner.blocking_list(parent, args)?;
+                    let p = PrefixLister::new(p, path);
+                    Ok((rp, CompleteLister::NeedPrefix(p)))
+                }
+            }
         }
     }
 }
@@ -706,6 +731,7 @@ where
 pub enum CompleteLister<A: Accessor, P> {
     AlreadyComplete(P),
     NeedFlat(FlatLister<Arc<A>, P>),
+    NeedPrefix(PrefixLister<P>),
 }
 
 #[async_trait]
@@ -720,6 +746,7 @@ where
         match self {
             AlreadyComplete(p) => p.poll_next(cx),
             NeedFlat(p) => p.poll_next(cx),
+            NeedPrefix(p) => p.poll_next(cx),
         }
     }
 }
@@ -735,6 +762,7 @@ where
         match self {
             AlreadyComplete(p) => p.next(),
             NeedFlat(p) => p.next(),
+            NeedPrefix(p) => p.next(),
         }
     }
 }
diff --git a/core/src/raw/oio/list/flat_list.rs 
b/core/src/raw/oio/list/flat_list.rs
index 9e8e2a5ac..c574d064e 100644
--- a/core/src/raw/oio/list/flat_list.rs
+++ b/core/src/raw/oio/list/flat_list.rs
@@ -28,7 +28,7 @@ use crate::*;
 /// ListFuture is the future returned while calling async list.
 type ListFuture<A, L> = BoxFuture<'static, (A, oio::Entry, Result<(RpList, 
L)>)>;
 
-/// ToFlatLister will walk dir in bottom up way:
+/// FlatLister will walk dir in bottom up way:
 ///
 /// - List nested dir first
 /// - Go back into parent dirs one by one
diff --git a/core/src/raw/oio/list/mod.rs b/core/src/raw/oio/list/mod.rs
index 62c52de3c..31f70d6f2 100644
--- a/core/src/raw/oio/list/mod.rs
+++ b/core/src/raw/oio/list/mod.rs
@@ -33,3 +33,6 @@ pub use flat_list::FlatLister;
 
 mod hierarchy_list;
 pub use hierarchy_list::HierarchyLister;
+
+mod prefix_list;
+pub use prefix_list::PrefixLister;
diff --git a/core/src/raw/oio/list/prefix_list.rs 
b/core/src/raw/oio/list/prefix_list.rs
new file mode 100644
index 000000000..f176e9bb9
--- /dev/null
+++ b/core/src/raw/oio/list/prefix_list.rs
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::task::ready;
+use std::task::Context;
+use std::task::Poll;
+
+use crate::raw::*;
+use crate::*;
+
+/// PrefixLister is used to filter entries by prefix.
+///
+/// For example, if we have a lister that returns entries:
+///
+/// ```txt
+/// .
+/// ├── file_a
+/// └── file_b
+/// ```
+///
+/// We can use `PrefixLister` to filter entries with prefix `file_`.
+pub struct PrefixLister<L> {
+    lister: L,
+    prefix: String,
+}
+
+/// # Safety
+///
+/// We will only take `&mut Self` reference for FsLister.
+unsafe impl<L> Sync for PrefixLister<L> {}
+
+impl<L> PrefixLister<L> {
+    /// Create a new flat lister
+    pub fn new(lister: L, prefix: &str) -> PrefixLister<L> {
+        PrefixLister {
+            lister,
+            prefix: prefix.to_string(),
+        }
+    }
+}
+
+impl<L> oio::List for PrefixLister<L>
+where
+    L: oio::List,
+{
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Result<Option<oio::Entry>>> {
+        loop {
+            match ready!(self.lister.poll_next(cx)) {
+                Ok(Some(e)) if !e.path().starts_with(&self.prefix) => continue,
+                v => return Poll::Ready(v),
+            }
+        }
+    }
+}
+
+impl<L> oio::BlockingList for PrefixLister<L>
+where
+    L: oio::BlockingList,
+{
+    fn next(&mut self) -> Result<Option<oio::Entry>> {
+        loop {
+            match self.lister.next() {
+                Ok(Some(e)) if !e.path().starts_with(&self.prefix) => continue,
+                v => return v,
+            }
+        }
+    }
+}
diff --git a/core/src/services/dashmap/backend.rs 
b/core/src/services/dashmap/backend.rs
index b85b528e2..f626bcae2 100644
--- a/core/src/services/dashmap/backend.rs
+++ b/core/src/services/dashmap/backend.rs
@@ -16,7 +16,8 @@
 // under the License.
 
 use std::collections::HashMap;
-use std::fmt::{Debug, Formatter};
+use std::fmt::Debug;
+use std::fmt::Formatter;
 
 use async_trait::async_trait;
 use dashmap::DashMap;
diff --git a/core/src/services/dropbox/core.rs 
b/core/src/services/dropbox/core.rs
index dbc53f69d..3fff5115d 100644
--- a/core/src/services/dropbox/core.rs
+++ b/core/src/services/dropbox/core.rs
@@ -15,13 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use backon::ExponentialBuilder;
 use std::default::Default;
 use std::fmt::Debug;
 use std::fmt::Formatter;
 use std::sync::Arc;
 use std::time::Duration;
 
+use backon::ExponentialBuilder;
 use bytes::Bytes;
 use chrono::DateTime;
 use chrono::Utc;
@@ -36,7 +36,8 @@ use serde::Deserialize;
 use serde::Serialize;
 use tokio::sync::Mutex;
 
-use super::error::{parse_error, DropboxErrorResponse};
+use super::error::parse_error;
+use super::error::DropboxErrorResponse;
 use crate::raw::*;
 use crate::*;
 
diff --git a/core/src/services/huggingface/core.rs 
b/core/src/services/huggingface/core.rs
index 67963082e..768b41600 100644
--- a/core/src/services/huggingface/core.rs
+++ b/core/src/services/huggingface/core.rs
@@ -234,10 +234,11 @@ pub(super) struct HuggingfaceImport {
 
 #[cfg(test)]
 mod tests {
+    use bytes::Bytes;
+
     use super::*;
     use crate::raw::new_json_deserialize_error;
     use crate::types::Result;
-    use bytes::Bytes;
 
     #[test]
     fn parse_list_response_test() -> Result<()> {
diff --git a/core/src/types/list.rs b/core/src/types/list.rs
index bc2e75120..d9140c9da 100644
--- a/core/src/types/list.rs
+++ b/core/src/types/list.rs
@@ -40,7 +40,6 @@ use crate::*;
 ///
 /// - Lister implements `Stream<Item = Result<Entry>>`.
 /// - Lister will return `None` if there is no more entries or error has been 
returned.
-///
 pub struct Lister {
     acc: FusedAccessor,
     lister: Option<oio::Lister>,
@@ -85,8 +84,9 @@ pub struct Lister {
 ///
 /// ```rust
 /// use std::mem::size_of;
-/// use opendal::Result;
+///
 /// use opendal::Entry;
+/// use opendal::Result;
 ///
 /// assert_eq!(264, size_of::<(String, Result<opendal::raw::RpStat>)>());
 /// assert_eq!(264, size_of::<Option<Entry>>());
diff --git a/core/src/types/operator/blocking_operator.rs 
b/core/src/types/operator/blocking_operator.rs
index f811eeedb..d3efad854 100644
--- a/core/src/types/operator/blocking_operator.rs
+++ b/core/src/types/operator/blocking_operator.rs
@@ -1024,16 +1024,6 @@ impl BlockingOperator {
             path,
             OpList::default(),
             |inner, path, args| {
-                if !validate_path(&path, EntryMode::DIR) {
-                    return Err(Error::new(
-                        ErrorKind::NotADirectory,
-                        "the path trying to list should end with `/`",
-                    )
-                    .with_operation("BlockingOperator::list")
-                    .with_context("service", 
inner.info().scheme().into_static())
-                    .with_context("path", &path));
-                }
-
                 let lister = BlockingLister::create(inner, &path, args)?;
 
                 lister.collect()
@@ -1197,19 +1187,7 @@ impl BlockingOperator {
             self.inner().clone(),
             path,
             OpList::default(),
-            |inner, path, args| {
-                if !validate_path(&path, EntryMode::DIR) {
-                    return Err(Error::new(
-                        ErrorKind::NotADirectory,
-                        "the path trying to list should end with `/`",
-                    )
-                    .with_operation("BlockingOperator::list")
-                    .with_context("service", 
inner.info().scheme().into_static())
-                    .with_context("path", &path));
-                }
-
-                BlockingLister::create(inner, &path, args)
-            },
+            |inner, path, args| BlockingLister::create(inner, &path, args),
         ))
     }
 }
diff --git a/core/src/types/operator/operator.rs 
b/core/src/types/operator/operator.rs
index 635577976..3e961fdae 100644
--- a/core/src/types/operator/operator.rs
+++ b/core/src/types/operator/operator.rs
@@ -1153,16 +1153,6 @@ impl Operator {
             OpList::default(),
             |inner, path, args| {
                 let fut = async move {
-                    if !validate_path(&path, EntryMode::DIR) {
-                        return Err(Error::new(
-                            ErrorKind::NotADirectory,
-                            "the path trying to list should end with `/`",
-                        )
-                        .with_operation("Operator::list")
-                        .with_context("service", 
inner.info().scheme().into_static())
-                        .with_context("path", &path));
-                    }
-
                     let lister = Lister::create(inner, &path, args).await?;
 
                     lister.try_collect().await
@@ -1330,19 +1320,7 @@ impl Operator {
             path,
             OpList::default(),
             |inner, path, args| {
-                let fut = async move {
-                    if !validate_path(&path, EntryMode::DIR) {
-                        return Err(Error::new(
-                            ErrorKind::NotADirectory,
-                            "the path trying to list should end with `/`",
-                        )
-                        .with_operation("Operator::list")
-                        .with_context("service", 
inner.info().scheme().into_static())
-                        .with_context("path", &path));
-                    }
-
-                    Lister::create(inner, &path, args).await
-                };
+                let fut = async move { Lister::create(inner, &path, 
args).await };
                 Box::pin(fut)
             },
         ));
diff --git a/core/tests/behavior/list.rs b/core/tests/behavior/list.rs
index 17f4a695a..d9da49090 100644
--- a/core/tests/behavior/list.rs
+++ b/core/tests/behavior/list.rs
@@ -39,6 +39,7 @@ pub fn behavior_list_tests(op: &Operator) -> Vec<Trial> {
         test_list_dir,
         test_list_dir_with_metakey,
         test_list_dir_with_metakey_complete,
+        test_list_prefix,
         test_list_rich_dir,
         test_list_empty_dir,
         test_list_non_exist_dir,
@@ -46,8 +47,8 @@ pub fn behavior_list_tests(op: &Operator) -> Vec<Trial> {
         test_list_nested_dir,
         test_list_dir_with_file_path,
         test_list_with_start_after,
-        test_scan,
-        test_scan_root,
+        test_list_with_recursive,
+        test_list_root_with_recursive,
         test_remove_all
     )
 }
@@ -175,6 +176,23 @@ pub async fn test_list_dir_with_metakey_complete(op: 
Operator) -> Result<()> {
     Ok(())
 }
 
+/// List prefix should return newly created file.
+pub async fn test_list_prefix(op: Operator) -> Result<()> {
+    let path = uuid::Uuid::new_v4().to_string();
+    debug!("Generate a random file: {}", &path);
+    let (content, _) = gen_bytes(op.info().full_capability());
+
+    op.write(&path, content).await.expect("write must succeed");
+
+    let obs = op.list(&path[..path.len() - 1]).await?;
+    assert_eq!(obs.len(), 1);
+    assert_eq!(obs[0].path(), path);
+    assert_eq!(obs[0].metadata().mode(), EntryMode::FILE);
+
+    op.delete(&path).await.expect("delete must succeed");
+    Ok(())
+}
+
 /// listing a directory, which contains more objects than a single page can 
take.
 pub async fn test_list_rich_dir(op: Operator) -> Result<()> {
     op.create_dir("test_list_rich_dir/").await?;
@@ -322,10 +340,17 @@ pub async fn test_list_nested_dir(op: Operator) -> 
Result<()> {
 /// List with path file should auto add / suffix.
 pub async fn test_list_dir_with_file_path(op: Operator) -> Result<()> {
     let parent = uuid::Uuid::new_v4().to_string();
+    let file = format!("{parent}/{}", uuid::Uuid::new_v4());
+
+    let (content, _) = gen_bytes(op.info().full_capability());
+    op.write(&file, content).await?;
+
+    let obs = op.list(&parent).await?;
+    assert_eq!(obs.len(), 1);
+    assert_eq!(obs[0].path(), format!("{parent}/"));
+    assert_eq!(obs[0].metadata().mode(), EntryMode::DIR);
 
-    let obs = op.lister(&parent).await.map(|_| ());
-    assert!(obs.is_err());
-    assert_eq!(obs.unwrap_err().kind(), ErrorKind::NotADirectory);
+    op.delete(&file).await?;
 
     Ok(())
 }
@@ -371,7 +396,7 @@ pub async fn test_list_with_start_after(op: Operator) -> 
Result<()> {
     Ok(())
 }
 
-pub async fn test_scan_root(op: Operator) -> Result<()> {
+pub async fn test_list_root_with_recursive(op: Operator) -> Result<()> {
     let w = op.lister_with("").recursive(true).await?;
     let actual = w
         .try_collect::<Vec<_>>()
@@ -386,7 +411,7 @@ pub async fn test_scan_root(op: Operator) -> Result<()> {
 }
 
 // Walk top down should output as expected
-pub async fn test_scan(op: Operator) -> Result<()> {
+pub async fn test_list_with_recursive(op: Operator) -> Result<()> {
     let parent = uuid::Uuid::new_v4().to_string();
 
     let expected = [

Reply via email to