tustvold commented on code in PR #324: URL: https://github.com/apache/arrow-rs-object-store/pull/324#discussion_r2052397996
########## src/lib.rs: ########## @@ -722,6 +722,64 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { /// Note: the order of returned [`ObjectMeta`] is not guaranteed fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>>; + /// List all the objects with given options defined in [`ListOpts`] + /// + /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar` is a prefix of `foo/bar/x` but not of + /// `foo/bar_baz/x`. List is recursive, i.e. `foo/bar/more/x` will be included. + fn list_opts( + &self, + prefix: Option<&Path>, + options: ListOpts, + ) -> BoxStream<'static, Result<ListResult>> { + // Add the default implementation via calling list_with_offset methods avoid bring + // a breaking change, the default implementation and other list methods will be removed + // in the future. All list request should call this method. + use std::collections::BTreeSet; + + let prefix = prefix.cloned().unwrap_or_default(); + let offset = options.offset.unwrap_or(prefix.clone()); + self.list_with_offset(Some(&prefix), &offset) + .try_chunks(1000) + .map(move |batch| { + let batch = batch.map_err(|e| e.1)?; + if options.delimiter { + let mut common_prefixes = BTreeSet::new(); + let mut objects = Vec::new(); + for obj in batch.into_iter() { + let location = obj.location.clone(); + let mut parts = match location.prefix_match(&prefix) { + Some(parts) => parts, + None => continue, + }; + + match parts.next() { + None => {} + Some(p) => match parts.next() { + None => objects.push(obj), + Some(_) => { + let common_prefix = prefix.child(p); + if common_prefix > offset { + common_prefixes.insert(common_prefix); + } + } + }, + } + drop(parts) + } + Ok(ListResult { + common_prefixes: common_prefixes.into_iter().collect(), + objects, + }) + } else { + Ok(ListResult { + common_prefixes: vec![], + objects: batch, + }) + } + }) + .boxed() Review Comment: ```suggestion // Add the default implementation via calling list_with_offset methods avoid bring // a breaking change, the default implementation will be removed, and default // implementations added for the other list methods using it futures::stream::once(futures::future::ready(Err(Error::NotImplemented))).boxed() ``` I don't think there is a coherent way to provide a default implementation here, the below ignores `extensions` and doesn't push down the delimiter. ########## src/lib.rs: ########## @@ -1283,6 +1359,22 @@ pub struct PutResult { pub version: Option<String>, } +/// Options for [`ObjectStore::list_opts`] +#[derive(Debug, Clone, Default)] +pub struct ListOpts { Review Comment: ```suggestion pub struct ListOptions { ``` ########## src/client/list.rs: ########## @@ -42,86 +42,44 @@ pub(crate) trait ListClientExt { fn list_paginated( &self, prefix: Option<&Path>, - delimiter: bool, - offset: Option<&Path>, + opts: ListOpts, ) -> BoxStream<'static, Result<ListResult>>; - - fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>>; - - #[allow(unused)] - fn list_with_offset( - &self, - prefix: Option<&Path>, - offset: &Path, - ) -> BoxStream<'static, Result<ObjectMeta>>; - - async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult>; } #[async_trait] impl<T: ListClient + Clone> ListClientExt for T { fn list_paginated( &self, prefix: Option<&Path>, - delimiter: bool, - offset: Option<&Path>, + opts: ListOpts, ) -> BoxStream<'static, Result<ListResult>> { + let ListOpts { + delimiter, + offset, + extensions, + } = opts; + let offset = offset.map(|x| x.to_string()); let prefix = prefix .filter(|x| !x.as_ref().is_empty()) .map(|p| format!("{}{}", p.as_ref(), crate::path::DELIMITER)); stream_paginated( self.clone(), - (prefix, offset), - move |client, (prefix, offset), token| async move { + (prefix, offset, extensions), + move |client, (prefix, offset, extensions), token| async move { let (r, next_token) = client .list_request( prefix.as_deref(), delimiter, token.as_deref(), offset.as_deref(), + extensions.clone(), ) .await?; - Ok((r, (prefix, offset), next_token)) + Ok((r, (prefix, offset, extensions), next_token)) }, ) .boxed() } - - fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> { Review Comment: Can we revert this change please, the purpose of ListClientExt is that the ListClient implementations just implement list_request and all the ObjectStore list methods are provided for them. Once list_opts is required we can remove ListClientExt, with the contents of these methods moving into the ObjectStore default impls, but until then... ########## src/http/mod.rs: ########## @@ -157,6 +158,93 @@ impl ObjectStore for HttpStore { .boxed() } + fn list_opts( Review Comment: This implementation seems over complicated given that webdav doesn't support pagination. Could we perhaps reuse more of the existing logic? ########## src/lib.rs: ########## @@ -1283,6 +1359,22 @@ pub struct PutResult { pub version: Option<String>, } +/// Options for [`ObjectStore::list_opts`] +#[derive(Debug, Clone, Default)] +pub struct ListOpts { + /// The listed objects whose locations are greater than `offset` + pub offset: Option<Path>, + /// True means returns common prefixes (directories) in addition to object in [`ListResult`] Review Comment: ```suggestion /// Only list up to the next path delimiter `/` [`ListResult`] /// /// See [`ObjectStore::list_with_delimiter`] ``` ########## src/lib.rs: ########## @@ -1283,6 +1359,22 @@ pub struct PutResult { pub version: Option<String>, } +/// Options for [`ObjectStore::list_opts`] +#[derive(Debug, Clone, Default)] +pub struct ListOpts { + /// The listed objects whose locations are greater than `offset` + pub offset: Option<Path>, + /// True means returns common prefixes (directories) in addition to object in [`ListResult`] + pub delimiter: bool, + /// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations + /// that need to pass context-specific information (like tracing spans) via trait methods. + /// + /// These extensions are ignored entirely by backends offered through this crate. + /// + /// They are also eclused from [`PartialEq`] and [`Eq`]. Review Comment: ```suggestion ``` I appreciate this is copied, but it is also not true ########## src/client/list.rs: ########## @@ -42,86 +42,44 @@ pub(crate) trait ListClientExt { fn list_paginated( &self, prefix: Option<&Path>, - delimiter: bool, - offset: Option<&Path>, + opts: ListOpts, ) -> BoxStream<'static, Result<ListResult>>; - - fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>>; - - #[allow(unused)] - fn list_with_offset( - &self, - prefix: Option<&Path>, - offset: &Path, - ) -> BoxStream<'static, Result<ObjectMeta>>; - - async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult>; } #[async_trait] impl<T: ListClient + Clone> ListClientExt for T { Review Comment: Can we add a list_opts impl here, to avoid duplicating the plumbing logic in the 3 cloud provider stores ########## src/integration.rs: ########## @@ -960,6 +960,102 @@ pub async fn list_with_delimiter(storage: &DynObjectStore) { assert!(content_list.is_empty()); } +/// Tests listing with composite conditions by [`ObjectStore::list_opts`] +pub async fn list_with_composite_conditions(storage: &DynObjectStore) { Review Comment: This doesn't appear to be wired into all of the integration tests? ########## src/lib.rs: ########## @@ -1283,6 +1359,22 @@ pub struct PutResult { pub version: Option<String>, } +/// Options for [`ObjectStore::list_opts`] +#[derive(Debug, Clone, Default)] +pub struct ListOpts { + /// The listed objects whose locations are greater than `offset` Review Comment: ```suggestion /// Only list objects greater than `offset` ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org