tustvold commented on code in PR #324:
URL: 
https://github.com/apache/arrow-rs-object-store/pull/324#discussion_r2040703373


##########
src/aws/builder.rs:
##########
@@ -891,6 +893,12 @@ impl AmazonS3Builder {
         self
     }
 
+    /// Set the max keys per list request. It's almost used for test paginated 
listing.

Review Comment:
   This functionality never appears to be used or tested, is it necessary?



##########
src/prefix.rs:
##########
@@ -165,6 +165,36 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
         s.map_ok(move |meta| strip_meta(&slf_prefix, meta)).boxed()
     }
 
+    fn list_opts(
+        &self,
+        prefix: Option<&Path>,
+        options: ListOpts,
+    ) -> BoxStream<'static, Result<ListResult>> {
+        let prefix = self.full_path(prefix.unwrap_or(&Path::default()));
+        let offset = options.offset.map(|p| self.full_path(&p));
+        let opts = ListOpts {
+            offset,
+            delimiter: options.delimiter,
+            max_keys: options.max_keys,
+            extensions: options.extensions,
+        };
+        let s = self.inner.list_opts(Some(&prefix), opts);
+
+        s.map_ok(move |lst| ListResult {

Review Comment:
   Perhaps we could encapsulate this, it is identical to list_with_delimiter 
below



##########
src/memory.rs:
##########
@@ -311,52 +311,70 @@ impl ObjectStore for InMemory {
         Ok(())
     }
 
-    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, 
Result<ObjectMeta>> {
-        let root = Path::default();
-        let prefix = prefix.unwrap_or(&root);
+    fn list_opts(
+        &self,
+        prefix: Option<&Path>,
+        options: ListOpts,
+    ) -> BoxStream<'static, Result<ListResult>> {
+        if options.delimiter {
+            self.list_with_delimiter(prefix, options.offset.as_ref(), 
options.max_keys)
+        } else {
+            self.list_without_delimiter(prefix, options.offset.as_ref(), 
options.max_keys)
+        }
+    }
 
-        let storage = self.storage.read();
-        let values: Vec<_> = storage
-            .map
-            .range((prefix)..)
-            .take_while(|(key, _)| key.as_ref().starts_with(prefix.as_ref()))
-            .filter(|(key, _)| {
-                // Don't return for exact prefix match
-                key.prefix_match(prefix)
-                    .map(|mut x| x.next().is_some())
-                    .unwrap_or(false)
-            })
-            .map(|(key, value)| {
-                Ok(ObjectMeta {
-                    location: key.clone(),
-                    last_modified: value.last_modified,
-                    size: value.data.len() as u64,
-                    e_tag: Some(value.e_tag.to_string()),
-                    version: None,
-                })
-            })
-            .collect();
+    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
+        let entry = self.entry(from)?;
+        self.storage
+            .write()
+            .insert(to, entry.data, entry.attributes);
+        Ok(())
+    }
 
-        futures::stream::iter(values).boxed()
+    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
+        let entry = self.entry(from)?;
+        let mut storage = self.storage.write();
+        if storage.map.contains_key(to) {
+            return Err(Error::AlreadyExists {
+                path: to.to_string(),
+            }
+            .into());
+        }
+        storage.insert(to, entry.data, entry.attributes);
+        Ok(())
     }
+}
 
-    /// The memory implementation returns all results, as opposed to the cloud
-    /// versions which limit their results to 1k or more because of API
-    /// limitations.
-    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> 
Result<ListResult> {
+impl InMemory {
+    fn list_with_delimiter(
+        &self,
+        prefix: Option<&Path>,
+        offset: Option<&Path>,
+        max_keys: Option<usize>,
+    ) -> BoxStream<'static, Result<ListResult>> {
         let root = Path::default();
         let prefix = prefix.unwrap_or(&root);
+        let offset = offset.unwrap_or(&root);
 
         let mut common_prefixes = BTreeSet::new();
-
         // Only objects in this base level should be returned in the
         // response. Otherwise, we just collect the common prefixes.
         let mut objects = vec![];
         for (k, v) in self.storage.read().map.range((prefix)..) {
+            if let Some(may_keys) = max_keys {

Review Comment:
   You could lift the conditional out by doing `let max_keys = 
max_keys.unwrap_or(usize::MAX)`.



##########
src/lib.rs:
##########
@@ -720,7 +721,22 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + 
Debug + 'static {
     /// `foo/bar_baz/x`. List is recursive, i.e. `foo/bar/more/x` will be 
included.
     ///
     /// Note: the order of returned [`ObjectMeta`] is not guaranteed
-    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, 
Result<ObjectMeta>>;
+    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, 
Result<ObjectMeta>> {
+        self.list_opts(prefix, ListOpts::default())
+            .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
+            .try_flatten()
+            .boxed()
+    }
+
+    /// List all the objects with given options defined in [`ListOpts`]
+    ///
+    /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar` is a 
prefix of `foo/bar/x` but not of
+    /// `foo/bar_baz/x`. List is recursive, i.e. `foo/bar/more/x` will be 
included.
+    fn list_opts(

Review Comment:
   This formulation is a breaking change, if we want to get this in any time in 
the next few months we probably want to instead provide a default 
implementation that calls through to list - erroring if not provided



##########
src/memory.rs:
##########
@@ -383,31 +401,57 @@ impl ObjectStore for InMemory {
             }
         }
 
-        Ok(ListResult {
+        let result = Ok(ListResult {
             objects,
             common_prefixes: common_prefixes.into_iter().collect(),
-        })
+        });
+        futures::stream::once(async { result }).boxed()
     }
+    fn list_without_delimiter(
+        &self,
+        prefix: Option<&Path>,
+        offset: Option<&Path>,
+        max_keys: Option<usize>,
+    ) -> BoxStream<'static, Result<ListResult>> {
+        let root = Path::default();
+        let prefix = prefix.unwrap_or(&root);
 
-    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
-        let entry = self.entry(from)?;
-        self.storage
-            .write()
-            .insert(to, entry.data, entry.attributes);
-        Ok(())
-    }
+        let storage = self.storage.read();
+        let mut values: Vec<_> = storage
+            .map
+            .range(prefix..)
+            .take_while(|(key, _)| key.as_ref().starts_with(prefix.as_ref()))
+            .filter(|(key, _)| {
+                // Don't return for exact prefix match
+                key.prefix_match(prefix)
+                    .map(|mut x| x.next().is_some())
+                    .unwrap_or(false)
+            })
+            .filter(|(key, _)| offset.map(|o| key > &o).unwrap_or(true))
+            .map(|(key, value)| ObjectMeta {
+                location: key.clone(),
+                last_modified: value.last_modified,
+                size: value.data.len() as u64,
+                e_tag: Some(value.e_tag.to_string()),
+                version: None,
+            })
+            .collect();
 
-    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
-        let entry = self.entry(from)?;
-        let mut storage = self.storage.write();
-        if storage.map.contains_key(to) {
-            return Err(Error::AlreadyExists {
-                path: to.to_string(),
+        let objects = match max_keys {
+            Some(max_keys) if max_keys < values.len() => {
+                values.truncate(max_keys);
+                values
             }

Review Comment:
   Could we instead use `Stream::take`?



##########
src/memory.rs:
##########
@@ -311,52 +311,70 @@ impl ObjectStore for InMemory {
         Ok(())
     }
 
-    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, 
Result<ObjectMeta>> {
-        let root = Path::default();
-        let prefix = prefix.unwrap_or(&root);
+    fn list_opts(
+        &self,
+        prefix: Option<&Path>,
+        options: ListOpts,
+    ) -> BoxStream<'static, Result<ListResult>> {
+        if options.delimiter {
+            self.list_with_delimiter(prefix, options.offset.as_ref(), 
options.max_keys)
+        } else {
+            self.list_without_delimiter(prefix, options.offset.as_ref(), 
options.max_keys)
+        }
+    }
 
-        let storage = self.storage.read();
-        let values: Vec<_> = storage
-            .map
-            .range((prefix)..)
-            .take_while(|(key, _)| key.as_ref().starts_with(prefix.as_ref()))
-            .filter(|(key, _)| {
-                // Don't return for exact prefix match
-                key.prefix_match(prefix)
-                    .map(|mut x| x.next().is_some())
-                    .unwrap_or(false)
-            })
-            .map(|(key, value)| {
-                Ok(ObjectMeta {
-                    location: key.clone(),
-                    last_modified: value.last_modified,
-                    size: value.data.len() as u64,
-                    e_tag: Some(value.e_tag.to_string()),
-                    version: None,
-                })
-            })
-            .collect();
+    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
+        let entry = self.entry(from)?;
+        self.storage
+            .write()
+            .insert(to, entry.data, entry.attributes);
+        Ok(())
+    }
 
-        futures::stream::iter(values).boxed()
+    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
+        let entry = self.entry(from)?;
+        let mut storage = self.storage.write();
+        if storage.map.contains_key(to) {
+            return Err(Error::AlreadyExists {
+                path: to.to_string(),
+            }
+            .into());
+        }
+        storage.insert(to, entry.data, entry.attributes);
+        Ok(())
     }
+}
 
-    /// The memory implementation returns all results, as opposed to the cloud
-    /// versions which limit their results to 1k or more because of API
-    /// limitations.
-    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> 
Result<ListResult> {
+impl InMemory {
+    fn list_with_delimiter(
+        &self,
+        prefix: Option<&Path>,
+        offset: Option<&Path>,
+        max_keys: Option<usize>,
+    ) -> BoxStream<'static, Result<ListResult>> {
         let root = Path::default();
         let prefix = prefix.unwrap_or(&root);
+        let offset = offset.unwrap_or(&root);
 
         let mut common_prefixes = BTreeSet::new();
-
         // Only objects in this base level should be returned in the
         // response. Otherwise, we just collect the common prefixes.
         let mut objects = vec![];
         for (k, v) in self.storage.read().map.range((prefix)..) {
+            if let Some(may_keys) = max_keys {
+                if common_prefixes.len() + objects.len() >= may_keys {
+                    break;
+                }
+            }
+
             if !k.as_ref().starts_with(prefix.as_ref()) {
                 break;
             }
 
+            if k <= offset {

Review Comment:
   Could we instead just start listing from `offset` instead of `prefix` if 
provided?



##########
src/lib.rs:
##########
@@ -745,7 +767,29 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + 
Debug + 'static {
     ///
     /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar` is a 
prefix of `foo/bar/x` but not of
     /// `foo/bar_baz/x`. List is not recursive, i.e. `foo/bar/more/x` will not 
be included.
-    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> 
Result<ListResult>;
+    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> 
Result<ListResult> {
+        let mut stream = self.list_opts(
+            prefix,
+            ListOpts {
+                delimiter: true,
+                ..ListOpts::default()
+            },
+        );
+
+        let mut common_prefixes = BTreeSet::new();
+        let mut objects = Vec::new();
+
+        while let Some(result) = stream.next().await {
+            let response = result?;
+            common_prefixes.extend(response.common_prefixes.into_iter());

Review Comment:
   Will this potentially result in the same common_prefix appearing multiple 
times?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to