This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 38a79ae4e Filter exact list prefix matches for MemoryStore and
HttpStore (#3712) (#3713)
38a79ae4e is described below
commit 38a79ae4e4bff70b3d74f7582f9c4f4dbff62b69
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon Feb 13 22:29:51 2023 +0000
Filter exact list prefix matches for MemoryStore and HttpStore (#3712)
(#3713)
* Filter exact list prefix matches for MemoryStore and HttpStore (#3712)
* Update object_store/src/lib.rs
Co-authored-by: Andrew Lamb <[email protected]>
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
object_store/src/http/mod.rs | 14 ++++++++++++--
object_store/src/lib.rs | 20 ++++++++++++++++++++
object_store/src/memory.rs | 21 +++++++++++++++++----
3 files changed, 49 insertions(+), 6 deletions(-)
diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs
index f05e70024..c91faa235 100644
--- a/object_store/src/http/mod.rs
+++ b/object_store/src/http/mod.rs
@@ -37,6 +37,7 @@ use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
+use itertools::Itertools;
use snafu::{OptionExt, ResultExt, Snafu};
use tokio::io::AsyncWrite;
use url::Url;
@@ -163,6 +164,7 @@ impl ObjectStore for HttpStore {
&self,
prefix: Option<&Path>,
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+ let prefix_len = prefix.map(|p| p.as_ref().len()).unwrap_or_default();
let status = self.client.list(prefix, "infinity").await?;
Ok(futures::stream::iter(
status
@@ -172,7 +174,9 @@ impl ObjectStore for HttpStore {
.map(|response| {
response.check_ok()?;
response.object_meta(self.client.base_url())
- }),
+ })
+ // Filter out exact prefix matches
+ .filter_ok(move |r| r.location.as_ref().len() > prefix_len),
)
.boxed())
}
@@ -186,7 +190,13 @@ impl ObjectStore for HttpStore {
for response in status.response {
response.check_ok()?;
match response.is_dir() {
- false =>
objects.push(response.object_meta(self.client.base_url())?),
+ false => {
+ let meta = response.object_meta(self.client.base_url())?;
+ // Filter out exact prefix matches
+ if meta.location.as_ref().len() > prefix_len {
+ objects.push(meta);
+ }
+ }
true => {
let path = response.path(self.client.base_url())?;
// Exclude the current object
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 8c202886b..6a3275bb0 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -911,9 +911,29 @@ mod tests {
let content_list = flatten_list_stream(storage,
Some(&prefix)).await.unwrap();
assert_eq!(content_list, &[location1.clone()]);
+ let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
+ assert_eq!(result.objects.len(), 1);
+ assert_eq!(result.objects[0].location, location1);
+ assert_eq!(result.common_prefixes, &[]);
+
+ // Listing an existing path (file) should return an empty list:
+ // https://github.com/apache/arrow-rs/issues/3712
+ let content_list = flatten_list_stream(storage, Some(&location1))
+ .await
+ .unwrap();
+ assert_eq!(content_list, &[]);
+
+ let list =
storage.list_with_delimiter(Some(&location1)).await.unwrap();
+ assert_eq!(list.objects, &[]);
+ assert_eq!(list.common_prefixes, &[]);
+
let prefix = Path::from("foo/x");
let content_list = flatten_list_stream(storage,
Some(&prefix)).await.unwrap();
assert_eq!(content_list, &[]);
+
+ let list = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
+ assert_eq!(list.objects, &[]);
+ assert_eq!(list.common_prefixes, &[]);
}
pub(crate) async fn list_with_delimiter(storage: &DynObjectStore) {
diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs
index 372164c2b..40eee55a1 100644
--- a/object_store/src/memory.rs
+++ b/object_store/src/memory.rs
@@ -163,13 +163,21 @@ impl ObjectStore for InMemory {
&self,
prefix: Option<&Path>,
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+ let root = Path::default();
+ let prefix = prefix.unwrap_or(&root);
let last_modified = Utc::now();
let storage = self.storage.read();
let values: Vec<_> = storage
- .iter()
- .filter(move |(key, _)| prefix.map(|p|
key.prefix_matches(p)).unwrap_or(true))
- .map(move |(key, value)| {
+ .range((prefix)..)
+ .take_while(|(key, _)| key.as_ref().starts_with(prefix.as_ref()))
+ .filter(|(key, _)| {
+ // Don't return for exact prefix match
+ key.prefix_match(prefix)
+ .map(|mut x| x.next().is_some())
+ .unwrap_or(false)
+ })
+ .map(|(key, value)| {
Ok(ObjectMeta {
location: key.clone(),
last_modified,
@@ -195,14 +203,19 @@ impl ObjectStore for InMemory {
// response. Otherwise, we just collect the common prefixes.
let mut objects = vec![];
for (k, v) in self.storage.read().range((prefix)..) {
+ if !k.as_ref().starts_with(prefix.as_ref()) {
+ break;
+ }
+
let mut parts = match k.prefix_match(prefix) {
Some(parts) => parts,
- None => break,
+ None => continue,
};
// Pop first element
let common_prefix = match parts.next() {
Some(p) => p,
+ // Should only return children of the prefix
None => continue,
};