This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 9a222e4d7 feat: Add list prefix support (#3728)
9a222e4d7 is described below
commit 9a222e4d72b328a24d5775b1565292f4636bbe69
Author: Xuanwo <[email protected]>
AuthorDate: Fri Dec 8 17:55:50 2023 +0800
feat: Add list prefix support (#3728)
* Remove limit on list prefix
Signed-off-by: Xuanwo <[email protected]>
* Cover more test
Signed-off-by: Xuanwo <[email protected]>
* feat: Add list prefix support
Signed-off-by: Xuanwo <[email protected]>
* Add upgrade
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
bindings/nodejs/src/lib.rs | 5 +-
core/src/docs/upgrade.md | 4 ++
core/src/layers/complete.rs | 48 ++++++++++++----
core/src/raw/oio/list/flat_list.rs | 2 +-
core/src/raw/oio/list/mod.rs | 3 +
core/src/raw/oio/list/prefix_list.rs | 82 ++++++++++++++++++++++++++++
core/src/services/dashmap/backend.rs | 3 +-
core/src/services/dropbox/core.rs | 5 +-
core/src/services/huggingface/core.rs | 3 +-
core/src/types/list.rs | 4 +-
core/src/types/operator/blocking_operator.rs | 24 +-------
core/src/types/operator/operator.rs | 24 +-------
core/tests/behavior/list.rs | 39 ++++++++++---
13 files changed, 173 insertions(+), 73 deletions(-)
diff --git a/bindings/nodejs/src/lib.rs b/bindings/nodejs/src/lib.rs
index bd3525bfc..ee05ee2cf 100644
--- a/bindings/nodejs/src/lib.rs
+++ b/bindings/nodejs/src/lib.rs
@@ -24,11 +24,10 @@ use std::collections::HashMap;
use std::str::FromStr;
use std::time::Duration;
-use opendal::raw::oio::BlockingRead;
-use opendal::raw::oio::ReadExt;
-
use futures::TryStreamExt;
use napi::bindgen_prelude::*;
+use opendal::raw::oio::BlockingRead;
+use opendal::raw::oio::ReadExt;
#[napi]
pub struct Operator(opendal::Operator);
diff --git a/core/src/docs/upgrade.md b/core/src/docs/upgrade.md
index f769cca2f..d5d8c9d7d 100644
--- a/core/src/docs/upgrade.md
+++ b/core/src/docs/upgrade.md
@@ -6,6 +6,10 @@
- The `thread_pool_enabled` option has been removed.
+### List Prefix Supported
+
+After [RFC: List Prefix](crate::docs::rfcs::rfc_3243_list_prefix) landed, we
have changed the behavior of `list` a path without `/`. OpenDAL used to return
`NotADirectory` error, but now we will return the list of entries that start
with given prefix instead.
+
# Upgrade to v0.43
## Public API
diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index 278a9e827..d41a4d6e0 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -30,6 +30,7 @@ use bytes::Bytes;
use crate::raw::oio::FileReader;
use crate::raw::oio::FlatLister;
use crate::raw::oio::LazyReader;
+use crate::raw::oio::PrefixLister;
use crate::raw::oio::RangeReader;
use crate::raw::oio::StreamableReader;
use crate::raw::*;
@@ -105,10 +106,8 @@ use crate::*;
/// Underlying services will return [`Capability`] to indicate the
/// features that returning listers support.
///
-/// - If both `list_without_recursive` and `list_with_recursive`, return
directly.
-/// - If only `list_with_recursive`, with [`oio::to_flat_lister`].
-/// - if only `list_without_recursive`, with [`oio::to_hierarchy_lister`].
-/// - If neither not supported, something must be wrong for `list` is true.
+/// - If support `list_with_recursive`, return directly.
+/// - if not, wrap with [`FlatLister`].
///
/// ## Capability Check
///
@@ -366,9 +365,8 @@ impl<A: Accessor> CompleteAccessor<A> {
let recursive = args.recursive();
match (recursive, cap.list_with_recursive) {
- // - If service can list_with_recursive
- // - If recursive is false
- (_, true) | (false, _) => {
+ // - If service can list_with_recursive, we can forward list to it
directly.
+ (_, true) => {
let (rp, p) = self.inner.list(path, args).await?;
Ok((rp, CompleteLister::AlreadyComplete(p)))
}
@@ -377,6 +375,20 @@ impl<A: Accessor> CompleteAccessor<A> {
let p = FlatLister::new(self.inner.clone(), path);
Ok((RpList::default(), CompleteLister::NeedFlat(p)))
}
+ // If recursive and service doesn't support list_with_recursive,
we need to handle
+ // list prefix by ourselves.
+ (false, false) => {
+ // Forward path that ends with /
+ if path.ends_with('/') {
+ let (rp, p) = self.inner.list(path, args).await?;
+ Ok((rp, CompleteLister::AlreadyComplete(p)))
+ } else {
+ let parent = get_parent(path);
+ let (rp, p) = self.inner.list(parent, args).await?;
+ let p = PrefixLister::new(p, path);
+ Ok((rp, CompleteLister::NeedPrefix(p)))
+ }
+ }
}
}
@@ -393,9 +405,8 @@ impl<A: Accessor> CompleteAccessor<A> {
let recursive = args.recursive();
match (recursive, cap.list_with_recursive) {
- // - If service can both list_with_recursive
- // - If recursive is false
- (_, true) | (false, _) => {
+ // - If service can list_with_recursive, we can forward list to it
directly.
+ (_, true) => {
let (rp, p) = self.inner.blocking_list(path, args)?;
Ok((rp, CompleteLister::AlreadyComplete(p)))
}
@@ -404,6 +415,20 @@ impl<A: Accessor> CompleteAccessor<A> {
let p = FlatLister::new(self.inner.clone(), path);
Ok((RpList::default(), CompleteLister::NeedFlat(p)))
}
+ // If recursive and service doesn't support list_with_recursive,
we need to handle
+ // list prefix by ourselves.
+ (false, false) => {
+ // Forward path that ends with /
+ if path.ends_with('/') {
+ let (rp, p) = self.inner.blocking_list(path, args)?;
+ Ok((rp, CompleteLister::AlreadyComplete(p)))
+ } else {
+ let parent = get_parent(path);
+ let (rp, p) = self.inner.blocking_list(parent, args)?;
+ let p = PrefixLister::new(p, path);
+ Ok((rp, CompleteLister::NeedPrefix(p)))
+ }
+ }
}
}
}
@@ -706,6 +731,7 @@ where
pub enum CompleteLister<A: Accessor, P> {
AlreadyComplete(P),
NeedFlat(FlatLister<Arc<A>, P>),
+ NeedPrefix(PrefixLister<P>),
}
#[async_trait]
@@ -720,6 +746,7 @@ where
match self {
AlreadyComplete(p) => p.poll_next(cx),
NeedFlat(p) => p.poll_next(cx),
+ NeedPrefix(p) => p.poll_next(cx),
}
}
}
@@ -735,6 +762,7 @@ where
match self {
AlreadyComplete(p) => p.next(),
NeedFlat(p) => p.next(),
+ NeedPrefix(p) => p.next(),
}
}
}
diff --git a/core/src/raw/oio/list/flat_list.rs
b/core/src/raw/oio/list/flat_list.rs
index 9e8e2a5ac..c574d064e 100644
--- a/core/src/raw/oio/list/flat_list.rs
+++ b/core/src/raw/oio/list/flat_list.rs
@@ -28,7 +28,7 @@ use crate::*;
/// ListFuture is the future returned while calling async list.
type ListFuture<A, L> = BoxFuture<'static, (A, oio::Entry, Result<(RpList,
L)>)>;
-/// ToFlatLister will walk dir in bottom up way:
+/// FlatLister will walk dir in bottom up way:
///
/// - List nested dir first
/// - Go back into parent dirs one by one
diff --git a/core/src/raw/oio/list/mod.rs b/core/src/raw/oio/list/mod.rs
index 62c52de3c..31f70d6f2 100644
--- a/core/src/raw/oio/list/mod.rs
+++ b/core/src/raw/oio/list/mod.rs
@@ -33,3 +33,6 @@ pub use flat_list::FlatLister;
mod hierarchy_list;
pub use hierarchy_list::HierarchyLister;
+
+mod prefix_list;
+pub use prefix_list::PrefixLister;
diff --git a/core/src/raw/oio/list/prefix_list.rs
b/core/src/raw/oio/list/prefix_list.rs
new file mode 100644
index 000000000..f176e9bb9
--- /dev/null
+++ b/core/src/raw/oio/list/prefix_list.rs
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::task::ready;
+use std::task::Context;
+use std::task::Poll;
+
+use crate::raw::*;
+use crate::*;
+
+/// PrefixLister is used to filter entries by prefix.
+///
+/// For example, if we have a lister that returns entries:
+///
+/// ```txt
+/// .
+/// ├── file_a
+/// └── file_b
+/// ```
+///
+/// We can use `PrefixLister` to filter entries with prefix `file_`.
+pub struct PrefixLister<L> {
+ lister: L,
+ prefix: String,
+}
+
+/// # Safety
+///
+/// We will only take `&mut Self` reference for FsLister.
+unsafe impl<L> Sync for PrefixLister<L> {}
+
+impl<L> PrefixLister<L> {
+ /// Create a new flat lister
+ pub fn new(lister: L, prefix: &str) -> PrefixLister<L> {
+ PrefixLister {
+ lister,
+ prefix: prefix.to_string(),
+ }
+ }
+}
+
+impl<L> oio::List for PrefixLister<L>
+where
+ L: oio::List,
+{
+ fn poll_next(&mut self, cx: &mut Context<'_>) ->
Poll<Result<Option<oio::Entry>>> {
+ loop {
+ match ready!(self.lister.poll_next(cx)) {
+ Ok(Some(e)) if !e.path().starts_with(&self.prefix) => continue,
+ v => return Poll::Ready(v),
+ }
+ }
+ }
+}
+
+impl<L> oio::BlockingList for PrefixLister<L>
+where
+ L: oio::BlockingList,
+{
+ fn next(&mut self) -> Result<Option<oio::Entry>> {
+ loop {
+ match self.lister.next() {
+ Ok(Some(e)) if !e.path().starts_with(&self.prefix) => continue,
+ v => return v,
+ }
+ }
+ }
+}
diff --git a/core/src/services/dashmap/backend.rs
b/core/src/services/dashmap/backend.rs
index b85b528e2..f626bcae2 100644
--- a/core/src/services/dashmap/backend.rs
+++ b/core/src/services/dashmap/backend.rs
@@ -16,7 +16,8 @@
// under the License.
use std::collections::HashMap;
-use std::fmt::{Debug, Formatter};
+use std::fmt::Debug;
+use std::fmt::Formatter;
use async_trait::async_trait;
use dashmap::DashMap;
diff --git a/core/src/services/dropbox/core.rs
b/core/src/services/dropbox/core.rs
index dbc53f69d..3fff5115d 100644
--- a/core/src/services/dropbox/core.rs
+++ b/core/src/services/dropbox/core.rs
@@ -15,13 +15,13 @@
// specific language governing permissions and limitations
// under the License.
-use backon::ExponentialBuilder;
use std::default::Default;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;
use std::time::Duration;
+use backon::ExponentialBuilder;
use bytes::Bytes;
use chrono::DateTime;
use chrono::Utc;
@@ -36,7 +36,8 @@ use serde::Deserialize;
use serde::Serialize;
use tokio::sync::Mutex;
-use super::error::{parse_error, DropboxErrorResponse};
+use super::error::parse_error;
+use super::error::DropboxErrorResponse;
use crate::raw::*;
use crate::*;
diff --git a/core/src/services/huggingface/core.rs
b/core/src/services/huggingface/core.rs
index 67963082e..768b41600 100644
--- a/core/src/services/huggingface/core.rs
+++ b/core/src/services/huggingface/core.rs
@@ -234,10 +234,11 @@ pub(super) struct HuggingfaceImport {
#[cfg(test)]
mod tests {
+ use bytes::Bytes;
+
use super::*;
use crate::raw::new_json_deserialize_error;
use crate::types::Result;
- use bytes::Bytes;
#[test]
fn parse_list_response_test() -> Result<()> {
diff --git a/core/src/types/list.rs b/core/src/types/list.rs
index bc2e75120..d9140c9da 100644
--- a/core/src/types/list.rs
+++ b/core/src/types/list.rs
@@ -40,7 +40,6 @@ use crate::*;
///
/// - Lister implements `Stream<Item = Result<Entry>>`.
/// - Lister will return `None` if there is no more entries or error has been
returned.
-///
pub struct Lister {
acc: FusedAccessor,
lister: Option<oio::Lister>,
@@ -85,8 +84,9 @@ pub struct Lister {
///
/// ```rust
/// use std::mem::size_of;
-/// use opendal::Result;
+///
/// use opendal::Entry;
+/// use opendal::Result;
///
/// assert_eq!(264, size_of::<(String, Result<opendal::raw::RpStat>)>());
/// assert_eq!(264, size_of::<Option<Entry>>());
diff --git a/core/src/types/operator/blocking_operator.rs
b/core/src/types/operator/blocking_operator.rs
index f811eeedb..d3efad854 100644
--- a/core/src/types/operator/blocking_operator.rs
+++ b/core/src/types/operator/blocking_operator.rs
@@ -1024,16 +1024,6 @@ impl BlockingOperator {
path,
OpList::default(),
|inner, path, args| {
- if !validate_path(&path, EntryMode::DIR) {
- return Err(Error::new(
- ErrorKind::NotADirectory,
- "the path trying to list should end with `/`",
- )
- .with_operation("BlockingOperator::list")
- .with_context("service",
inner.info().scheme().into_static())
- .with_context("path", &path));
- }
-
let lister = BlockingLister::create(inner, &path, args)?;
lister.collect()
@@ -1197,19 +1187,7 @@ impl BlockingOperator {
self.inner().clone(),
path,
OpList::default(),
- |inner, path, args| {
- if !validate_path(&path, EntryMode::DIR) {
- return Err(Error::new(
- ErrorKind::NotADirectory,
- "the path trying to list should end with `/`",
- )
- .with_operation("BlockingOperator::list")
- .with_context("service",
inner.info().scheme().into_static())
- .with_context("path", &path));
- }
-
- BlockingLister::create(inner, &path, args)
- },
+ |inner, path, args| BlockingLister::create(inner, &path, args),
))
}
}
diff --git a/core/src/types/operator/operator.rs
b/core/src/types/operator/operator.rs
index 635577976..3e961fdae 100644
--- a/core/src/types/operator/operator.rs
+++ b/core/src/types/operator/operator.rs
@@ -1153,16 +1153,6 @@ impl Operator {
OpList::default(),
|inner, path, args| {
let fut = async move {
- if !validate_path(&path, EntryMode::DIR) {
- return Err(Error::new(
- ErrorKind::NotADirectory,
- "the path trying to list should end with `/`",
- )
- .with_operation("Operator::list")
- .with_context("service",
inner.info().scheme().into_static())
- .with_context("path", &path));
- }
-
let lister = Lister::create(inner, &path, args).await?;
lister.try_collect().await
@@ -1330,19 +1320,7 @@ impl Operator {
path,
OpList::default(),
|inner, path, args| {
- let fut = async move {
- if !validate_path(&path, EntryMode::DIR) {
- return Err(Error::new(
- ErrorKind::NotADirectory,
- "the path trying to list should end with `/`",
- )
- .with_operation("Operator::list")
- .with_context("service",
inner.info().scheme().into_static())
- .with_context("path", &path));
- }
-
- Lister::create(inner, &path, args).await
- };
+ let fut = async move { Lister::create(inner, &path,
args).await };
Box::pin(fut)
},
));
diff --git a/core/tests/behavior/list.rs b/core/tests/behavior/list.rs
index 17f4a695a..d9da49090 100644
--- a/core/tests/behavior/list.rs
+++ b/core/tests/behavior/list.rs
@@ -39,6 +39,7 @@ pub fn behavior_list_tests(op: &Operator) -> Vec<Trial> {
test_list_dir,
test_list_dir_with_metakey,
test_list_dir_with_metakey_complete,
+ test_list_prefix,
test_list_rich_dir,
test_list_empty_dir,
test_list_non_exist_dir,
@@ -46,8 +47,8 @@ pub fn behavior_list_tests(op: &Operator) -> Vec<Trial> {
test_list_nested_dir,
test_list_dir_with_file_path,
test_list_with_start_after,
- test_scan,
- test_scan_root,
+ test_list_with_recursive,
+ test_list_root_with_recursive,
test_remove_all
)
}
@@ -175,6 +176,23 @@ pub async fn test_list_dir_with_metakey_complete(op:
Operator) -> Result<()> {
Ok(())
}
+/// List prefix should return newly created file.
+pub async fn test_list_prefix(op: Operator) -> Result<()> {
+ let path = uuid::Uuid::new_v4().to_string();
+ debug!("Generate a random file: {}", &path);
+ let (content, _) = gen_bytes(op.info().full_capability());
+
+ op.write(&path, content).await.expect("write must succeed");
+
+ let obs = op.list(&path[..path.len() - 1]).await?;
+ assert_eq!(obs.len(), 1);
+ assert_eq!(obs[0].path(), path);
+ assert_eq!(obs[0].metadata().mode(), EntryMode::FILE);
+
+ op.delete(&path).await.expect("delete must succeed");
+ Ok(())
+}
+
/// listing a directory, which contains more objects than a single page can
take.
pub async fn test_list_rich_dir(op: Operator) -> Result<()> {
op.create_dir("test_list_rich_dir/").await?;
@@ -322,10 +340,17 @@ pub async fn test_list_nested_dir(op: Operator) ->
Result<()> {
/// List with path file should auto add / suffix.
pub async fn test_list_dir_with_file_path(op: Operator) -> Result<()> {
let parent = uuid::Uuid::new_v4().to_string();
+ let file = format!("{parent}/{}", uuid::Uuid::new_v4());
+
+ let (content, _) = gen_bytes(op.info().full_capability());
+ op.write(&file, content).await?;
+
+ let obs = op.list(&parent).await?;
+ assert_eq!(obs.len(), 1);
+ assert_eq!(obs[0].path(), format!("{parent}/"));
+ assert_eq!(obs[0].metadata().mode(), EntryMode::DIR);
- let obs = op.lister(&parent).await.map(|_| ());
- assert!(obs.is_err());
- assert_eq!(obs.unwrap_err().kind(), ErrorKind::NotADirectory);
+ op.delete(&file).await?;
Ok(())
}
@@ -371,7 +396,7 @@ pub async fn test_list_with_start_after(op: Operator) ->
Result<()> {
Ok(())
}
-pub async fn test_scan_root(op: Operator) -> Result<()> {
+pub async fn test_list_root_with_recursive(op: Operator) -> Result<()> {
let w = op.lister_with("").recursive(true).await?;
let actual = w
.try_collect::<Vec<_>>()
@@ -386,7 +411,7 @@ pub async fn test_scan_root(op: Operator) -> Result<()> {
}
// Walk top down should output as expected
-pub async fn test_scan(op: Operator) -> Result<()> {
+pub async fn test_list_with_recursive(op: Operator) -> Result<()> {
let parent = uuid::Uuid::new_v4().to_string();
let expected = [