tustvold commented on code in PR #324: URL: https://github.com/apache/arrow-rs-object-store/pull/324#discussion_r2040703373
########## src/aws/builder.rs: ########## @@ -891,6 +893,12 @@ impl AmazonS3Builder { self } + /// Set the max keys per list request. It's almost used for test paginated listing. Review Comment: This functionality never appears to be used or tested, is it necessary? ########## src/prefix.rs: ########## @@ -165,6 +165,36 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> { s.map_ok(move |meta| strip_meta(&slf_prefix, meta)).boxed() } + fn list_opts( + &self, + prefix: Option<&Path>, + options: ListOpts, + ) -> BoxStream<'static, Result<ListResult>> { + let prefix = self.full_path(prefix.unwrap_or(&Path::default())); + let offset = options.offset.map(|p| self.full_path(&p)); + let opts = ListOpts { + offset, + delimiter: options.delimiter, + max_keys: options.max_keys, + extensions: options.extensions, + }; + let s = self.inner.list_opts(Some(&prefix), opts); + + s.map_ok(move |lst| ListResult { Review Comment: Perhaps we could encapsulate this, it is identical to list_with_delimiter below ########## src/memory.rs: ########## @@ -311,52 +311,70 @@ impl ObjectStore for InMemory { Ok(()) } - fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> { - let root = Path::default(); - let prefix = prefix.unwrap_or(&root); + fn list_opts( + &self, + prefix: Option<&Path>, + options: ListOpts, + ) -> BoxStream<'static, Result<ListResult>> { + if options.delimiter { + self.list_with_delimiter(prefix, options.offset.as_ref(), options.max_keys) + } else { + self.list_without_delimiter(prefix, options.offset.as_ref(), options.max_keys) + } + } - let storage = self.storage.read(); - let values: Vec<_> = storage - .map - .range((prefix)..) - .take_while(|(key, _)| key.as_ref().starts_with(prefix.as_ref())) - .filter(|(key, _)| { - // Don't return for exact prefix match - key.prefix_match(prefix) - .map(|mut x| x.next().is_some()) - .unwrap_or(false) - }) - .map(|(key, value)| { - Ok(ObjectMeta { - location: key.clone(), - last_modified: value.last_modified, - size: value.data.len() as u64, - e_tag: Some(value.e_tag.to_string()), - version: None, - }) - }) - .collect(); + async fn copy(&self, from: &Path, to: &Path) -> Result<()> { + let entry = self.entry(from)?; + self.storage + .write() + .insert(to, entry.data, entry.attributes); + Ok(()) + } - futures::stream::iter(values).boxed() + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { + let entry = self.entry(from)?; + let mut storage = self.storage.write(); + if storage.map.contains_key(to) { + return Err(Error::AlreadyExists { + path: to.to_string(), + } + .into()); + } + storage.insert(to, entry.data, entry.attributes); + Ok(()) } +} - /// The memory implementation returns all results, as opposed to the cloud - /// versions which limit their results to 1k or more because of API - /// limitations. - async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> { +impl InMemory { + fn list_with_delimiter( + &self, + prefix: Option<&Path>, + offset: Option<&Path>, + max_keys: Option<usize>, + ) -> BoxStream<'static, Result<ListResult>> { let root = Path::default(); let prefix = prefix.unwrap_or(&root); + let offset = offset.unwrap_or(&root); let mut common_prefixes = BTreeSet::new(); - // Only objects in this base level should be returned in the // response. Otherwise, we just collect the common prefixes. let mut objects = vec![]; for (k, v) in self.storage.read().map.range((prefix)..) { + if let Some(may_keys) = max_keys { Review Comment: You could lift the conditional out by doing `let max_keys = max_keys.unwrap_or(usize::MAX)`. ########## src/lib.rs: ########## @@ -720,7 +721,22 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { /// `foo/bar_baz/x`. List is recursive, i.e. `foo/bar/more/x` will be included. /// /// Note: the order of returned [`ObjectMeta`] is not guaranteed - fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>>; + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> { + self.list_opts(prefix, ListOpts::default()) + .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok))) + .try_flatten() + .boxed() + } + + /// List all the objects with given options defined in [`ListOpts`] + /// + /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar` is a prefix of `foo/bar/x` but not of + /// `foo/bar_baz/x`. List is recursive, i.e. `foo/bar/more/x` will be included. + fn list_opts( Review Comment: This formulation is a breaking change, if we want to get this in any time in the next few months we probably want to instead provide a default implementation that calls through to list - erroring if not provided ########## src/memory.rs: ########## @@ -383,31 +401,57 @@ impl ObjectStore for InMemory { } } - Ok(ListResult { + let result = Ok(ListResult { objects, common_prefixes: common_prefixes.into_iter().collect(), - }) + }); + futures::stream::once(async { result }).boxed() } + fn list_without_delimiter( + &self, + prefix: Option<&Path>, + offset: Option<&Path>, + max_keys: Option<usize>, + ) -> BoxStream<'static, Result<ListResult>> { + let root = Path::default(); + let prefix = prefix.unwrap_or(&root); - async fn copy(&self, from: &Path, to: &Path) -> Result<()> { - let entry = self.entry(from)?; - self.storage - .write() - .insert(to, entry.data, entry.attributes); - Ok(()) - } + let storage = self.storage.read(); + let mut values: Vec<_> = storage + .map + .range(prefix..) + .take_while(|(key, _)| key.as_ref().starts_with(prefix.as_ref())) + .filter(|(key, _)| { + // Don't return for exact prefix match + key.prefix_match(prefix) + .map(|mut x| x.next().is_some()) + .unwrap_or(false) + }) + .filter(|(key, _)| offset.map(|o| key > &o).unwrap_or(true)) + .map(|(key, value)| ObjectMeta { + location: key.clone(), + last_modified: value.last_modified, + size: value.data.len() as u64, + e_tag: Some(value.e_tag.to_string()), + version: None, + }) + .collect(); - async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { - let entry = self.entry(from)?; - let mut storage = self.storage.write(); - if storage.map.contains_key(to) { - return Err(Error::AlreadyExists { - path: to.to_string(), + let objects = match max_keys { + Some(max_keys) if max_keys < values.len() => { + values.truncate(max_keys); + values } Review Comment: Could we instead use `Stream::take`? ########## src/memory.rs: ########## @@ -311,52 +311,70 @@ impl ObjectStore for InMemory { Ok(()) } - fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> { - let root = Path::default(); - let prefix = prefix.unwrap_or(&root); + fn list_opts( + &self, + prefix: Option<&Path>, + options: ListOpts, + ) -> BoxStream<'static, Result<ListResult>> { + if options.delimiter { + self.list_with_delimiter(prefix, options.offset.as_ref(), options.max_keys) + } else { + self.list_without_delimiter(prefix, options.offset.as_ref(), options.max_keys) + } + } - let storage = self.storage.read(); - let values: Vec<_> = storage - .map - .range((prefix)..) - .take_while(|(key, _)| key.as_ref().starts_with(prefix.as_ref())) - .filter(|(key, _)| { - // Don't return for exact prefix match - key.prefix_match(prefix) - .map(|mut x| x.next().is_some()) - .unwrap_or(false) - }) - .map(|(key, value)| { - Ok(ObjectMeta { - location: key.clone(), - last_modified: value.last_modified, - size: value.data.len() as u64, - e_tag: Some(value.e_tag.to_string()), - version: None, - }) - }) - .collect(); + async fn copy(&self, from: &Path, to: &Path) -> Result<()> { + let entry = self.entry(from)?; + self.storage + .write() + .insert(to, entry.data, entry.attributes); + Ok(()) + } - futures::stream::iter(values).boxed() + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { + let entry = self.entry(from)?; + let mut storage = self.storage.write(); + if storage.map.contains_key(to) { + return Err(Error::AlreadyExists { + path: to.to_string(), + } + .into()); + } + storage.insert(to, entry.data, entry.attributes); + Ok(()) } +} - /// The memory implementation returns all results, as opposed to the cloud - /// versions which limit their results to 1k or more because of API - /// limitations. - async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> { +impl InMemory { + fn list_with_delimiter( + &self, + prefix: Option<&Path>, + offset: Option<&Path>, + max_keys: Option<usize>, + ) -> BoxStream<'static, Result<ListResult>> { let root = Path::default(); let prefix = prefix.unwrap_or(&root); + let offset = offset.unwrap_or(&root); let mut common_prefixes = BTreeSet::new(); - // Only objects in this base level should be returned in the // response. Otherwise, we just collect the common prefixes. let mut objects = vec![]; for (k, v) in self.storage.read().map.range((prefix)..) { + if let Some(may_keys) = max_keys { + if common_prefixes.len() + objects.len() >= may_keys { + break; + } + } + if !k.as_ref().starts_with(prefix.as_ref()) { break; } + if k <= offset { Review Comment: Could we instead just start listing from `offset` instead of `prefix` if provided? ########## src/lib.rs: ########## @@ -745,7 +767,29 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { /// /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar` is a prefix of `foo/bar/x` but not of /// `foo/bar_baz/x`. List is not recursive, i.e. `foo/bar/more/x` will not be included. - async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult>; + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> { + let mut stream = self.list_opts( + prefix, + ListOpts { + delimiter: true, + ..ListOpts::default() + }, + ); + + let mut common_prefixes = BTreeSet::new(); + let mut objects = Vec::new(); + + while let Some(result) = stream.next().await { + let response = result?; + common_prefixes.extend(response.common_prefixes.into_iter()); Review Comment: Will this potentially result in the same common_prefix appearing multiple times? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org