This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git


The following commit(s) were added to refs/heads/main by this push:
     new 72088de  Add ObjectStoreRegistry (#347) (#375)
72088de is described below

commit 72088de1eb51de41dc4fcb77c76d464f41cb1a04
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Sun May 25 12:32:09 2025 +0100

    Add ObjectStoreRegistry (#347) (#375)
    
    * Add ObjectStoreRegistry (#347)
    
    * Make path segment based
    
    * Fix doc
    
    * Additional test
    
    * Handle race
    
    * Review feedback
    
    * Fix prefix bug
    
    * Review feedback
---
 src/lib.rs      |   1 +
 src/parse.rs    |   9 ++
 src/registry.rs | 340 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 350 insertions(+)

diff --git a/src/lib.rs b/src/lib.rs
index 4743f96..80e91d7 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -524,6 +524,7 @@ pub mod local;
 pub mod memory;
 pub mod path;
 pub mod prefix;
+pub mod registry;
 #[cfg(feature = "cloud")]
 pub mod signer;
 pub mod throttle;
diff --git a/src/parse.rs b/src/parse.rs
index e37f85b..0733dd2 100644
--- a/src/parse.rs
+++ b/src/parse.rs
@@ -286,6 +286,15 @@ mod tests {
                 "s3://bucket/foo%20bar",
                 (ObjectStoreScheme::AmazonS3, "foo bar"),
             ),
+            (
+                "s3://bucket/foo bar",
+                (ObjectStoreScheme::AmazonS3, "foo bar"),
+            ),
+            ("s3://bucket/😀", (ObjectStoreScheme::AmazonS3, "😀")),
+            (
+                "s3://bucket/%F0%9F%98%80",
+                (ObjectStoreScheme::AmazonS3, "😀"),
+            ),
             (
                 "https://foo/bar%20baz";,
                 (ObjectStoreScheme::Http, "bar baz"),
diff --git a/src/registry.rs b/src/registry.rs
new file mode 100644
index 0000000..81770c5
--- /dev/null
+++ b/src/registry.rs
@@ -0,0 +1,340 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Map object URLs to [`ObjectStore`]
+
+use crate::path::{InvalidPart, Path, PathPart};
+use crate::{parse_url_opts, ObjectStore};
+use parking_lot::RwLock;
+use std::collections::HashMap;
+use std::sync::Arc;
+use url::Url;
+
+/// [`ObjectStoreRegistry`] maps a URL to an [`ObjectStore`] instance
+pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static {
+    /// Register a new store for the provided store URL
+    ///
+    /// If a store with the same URL existed before, it is replaced and 
returned
+    fn register(&self, url: Url, store: Arc<dyn ObjectStore>) -> 
Option<Arc<dyn ObjectStore>>;
+
+    /// Resolve an object URL
+    ///
+    /// If [`ObjectStoreRegistry::register`] has been called with a URL with 
the same
+    /// scheme, and authority as the object URL, and a path that is a prefix 
of the object
+    /// URL's, it should be returned along with the trailing path. Paths 
should be matched
+    /// on a path segment basis, and in the event of multiple possibilities 
the longest
+    /// path match should be returned.
+    ///
+    /// If a store hasn't been registered, an [`ObjectStoreRegistry`] may 
lazily create
+    /// one if the URL is understood
+    ///
+    /// For example
+    ///
+    /// ```
+    /// # use std::sync::Arc;
+    /// # use url::Url;
+    /// # use object_store::memory::InMemory;
+    /// # use object_store::ObjectStore;
+    /// # use object_store::prefix::PrefixStore;
+    /// # use object_store::registry::{DefaultObjectStoreRegistry, 
ObjectStoreRegistry};
+    /// #
+    /// let registry = DefaultObjectStoreRegistry::new();
+    ///
+    /// let bucket1 = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+    /// let base = Url::parse("s3://bucket1/").unwrap();
+    /// registry.register(base, bucket1.clone());
+    ///
+    /// let url = Url::parse("s3://bucket1/path/to/object").unwrap();
+    /// let (ret, path) = registry.resolve(&url).unwrap();
+    /// assert_eq!(path.as_ref(), "path/to/object");
+    /// assert!(Arc::ptr_eq(&ret, &bucket1));
+    ///
+    /// let bucket2 = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+    /// let base = 
Url::parse("https://s3.region.amazonaws.com/bucket";).unwrap();
+    /// registry.register(base, bucket2.clone());
+    ///
+    /// let url = 
Url::parse("https://s3.region.amazonaws.com/bucket/path/to/object";).unwrap();
+    /// let (ret, path) = registry.resolve(&url).unwrap();
+    /// assert_eq!(path.as_ref(), "path/to/object");
+    /// assert!(Arc::ptr_eq(&ret, &bucket2));
+    ///
+    /// let bucket3 = Arc::new(PrefixStore::new(InMemory::new(), "path")) as 
Arc<dyn ObjectStore>;
+    /// let base = 
Url::parse("https://s3.region.amazonaws.com/bucket/path";).unwrap();
+    /// registry.register(base, bucket3.clone());
+    ///
+    /// let url = 
Url::parse("https://s3.region.amazonaws.com/bucket/path/to/object";).unwrap();
+    /// let (ret, path) = registry.resolve(&url).unwrap();
+    /// assert_eq!(path.as_ref(), "to/object");
+    /// assert!(Arc::ptr_eq(&ret, &bucket3));
+    /// ```
+    fn resolve(&self, url: &Url) -> crate::Result<(Arc<dyn ObjectStore>, 
Path)>;
+}
+
+/// Error type for [`DefaultObjectStoreRegistry`]
+///
+/// Crate private/opaque type to make the error handling code more ergonomic.
+/// Always converted into `crate::Error` when reported externally.
+#[derive(Debug, thiserror::Error)]
+#[non_exhaustive]
+enum Error {
+    #[error("ObjectStore not found")]
+    NotFound,
+
+    #[error("Error parsing URL path segment")]
+    InvalidPart(#[from] InvalidPart),
+}
+
+impl From<Error> for crate::Error {
+    fn from(value: Error) -> Self {
+        Self::Generic {
+            store: "ObjectStoreRegistry",
+            source: Box::new(value),
+        }
+    }
+}
+
+/// An [`ObjectStoreRegistry`] that uses [`parse_url_opts`] to create stores 
based on the environment
+#[derive(Debug, Default)]
+pub struct DefaultObjectStoreRegistry {
+    /// Mapping from [`url_key`] to [`PathEntry`]
+    map: RwLock<HashMap<String, PathEntry>>,
+}
+
+/// [`PathEntry`] construct a tree of path segments starting from the root
+///
+/// For example the following paths
+///
+/// * `/` => store1
+/// * `/foo/bar` => store2
+///
+/// Would be represented by
+///
+/// ```yaml
+/// store: Some(store1)
+/// children:
+///   foo:
+///     store: None
+///     children:
+///       bar:
+///         store: Some(store2)
+/// ```
+///
+#[derive(Debug, Default)]
+struct PathEntry {
+    /// Store, if defined at this path
+    store: Option<Arc<dyn ObjectStore>>,
+    /// Child [`PathEntry`], keyed by the next path segment in their path
+    children: HashMap<String, Self>,
+}
+
+impl PathEntry {
+    /// Lookup a store based on URL path
+    ///
+    /// Returns the store and its path segment depth
+    fn lookup(&self, to_resolve: &Url) -> Option<(&Arc<dyn ObjectStore>, 
usize)> {
+        let mut current = self;
+        let mut ret = self.store.as_ref().map(|store| (store, 0));
+        let mut depth = 0;
+        // Traverse the PathEntry tree to find the longest match
+        for segment in path_segments(to_resolve.path()) {
+            match current.children.get(segment) {
+                Some(e) => {
+                    current = e;
+                    depth += 1;
+                    if let Some(store) = &current.store {
+                        ret = Some((store, depth))
+                    }
+                }
+                None => break,
+            }
+        }
+        ret
+    }
+}
+
+impl DefaultObjectStoreRegistry {
+    /// Create a new [`DefaultObjectStoreRegistry`]
+    pub fn new() -> Self {
+        Self::default()
+    }
+}
+
+impl ObjectStoreRegistry for DefaultObjectStoreRegistry {
+    fn register(&self, url: Url, store: Arc<dyn ObjectStore>) -> 
Option<Arc<dyn ObjectStore>> {
+        let mut map = self.map.write();
+        let key = url_key(&url);
+        let mut entry = map.entry(key.to_string()).or_default();
+
+        for segment in path_segments(url.path()) {
+            entry = entry.children.entry(segment.to_string()).or_default();
+        }
+        entry.store.replace(store)
+    }
+
+    fn resolve(&self, to_resolve: &Url) -> crate::Result<(Arc<dyn 
ObjectStore>, Path)> {
+        let key = url_key(to_resolve);
+        {
+            let map = self.map.read();
+
+            if let Some((store, depth)) = map.get(key).and_then(|entry| 
entry.lookup(to_resolve)) {
+                let path = path_suffix(to_resolve, depth)?;
+                return Ok((Arc::clone(store), path));
+            }
+        }
+
+        if let Ok((store, path)) = parse_url_opts(to_resolve, 
std::env::vars()) {
+            let depth = num_segments(to_resolve.path()) - 
num_segments(path.as_ref());
+
+            let mut map = self.map.write();
+            let mut entry = map.entry(key.to_string()).or_default();
+            for segment in path_segments(to_resolve.path()).take(depth) {
+                entry = entry.children.entry(segment.to_string()).or_default();
+            }
+            let store = Arc::clone(match &entry.store {
+                None => entry.store.insert(Arc::from(store)),
+                Some(x) => x, // Racing creation - use existing
+            });
+
+            let path = path_suffix(to_resolve, depth)?;
+            return Ok((store, path));
+        }
+
+        Err(Error::NotFound.into())
+    }
+}
+
+/// Extracts the scheme and authority of a URL (components before the Path)
+fn url_key(url: &Url) -> &str {
+    &url[..url::Position::AfterPort]
+}
+
+/// Returns the non-empty segments of a path
+///
+/// Note: We don't use [`Url::path_segments`] as we only want non-empty paths
+fn path_segments(s: &str) -> impl Iterator<Item = &str> {
+    s.split('/').filter(|x| !x.is_empty())
+}
+
+/// Returns the number of non-empty path segments in a path
+fn num_segments(s: &str) -> usize {
+    path_segments(s).count()
+}
+
+/// Returns the path of `url` skipping the first `depth` segments
+fn path_suffix(url: &Url, depth: usize) -> Result<Path, Error> {
+    let segments = path_segments(url.path()).skip(depth);
+    let path = segments.map(PathPart::parse).collect::<Result<_, _>>()?;
+    Ok(path)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::memory::InMemory;
+    use crate::prefix::PrefixStore;
+
+    #[test]
+    fn test_num_segments() {
+        assert_eq!(num_segments(""), 0);
+        assert_eq!(num_segments("/"), 0);
+        assert_eq!(num_segments("/banana"), 1);
+        assert_eq!(num_segments("banana"), 1);
+        assert_eq!(num_segments("/banana/crumble"), 2);
+        assert_eq!(num_segments("banana/crumble"), 2);
+    }
+
+    #[test]
+    fn test_default_registry() {
+        let registry = DefaultObjectStoreRegistry::new();
+
+        // Should automatically register in memory store
+        let banana_url = Url::parse("memory:///banana").unwrap();
+        let (resolved, path) = registry.resolve(&banana_url).unwrap();
+        assert_eq!(path.as_ref(), "banana");
+
+        // Should replace store
+        let url = Url::parse("memory:///").unwrap();
+        let root = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+        let replaced = registry.register(url, Arc::clone(&root)).unwrap();
+        assert!(Arc::ptr_eq(&resolved, &replaced));
+
+        // Should not replace store
+        let banana = Arc::new(PrefixStore::new(InMemory::new(), "banana")) as 
Arc<dyn ObjectStore>;
+        assert!(registry
+            .register(banana_url.clone(), Arc::clone(&banana))
+            .is_none());
+
+        // Should resolve to banana store
+        let (resolved, path) = registry.resolve(&banana_url).unwrap();
+        assert_eq!(path.as_ref(), "");
+        assert!(Arc::ptr_eq(&resolved, &banana));
+
+        // If we register another store it still resolves banana
+        let apples_url = Url::parse("memory:///apples").unwrap();
+        let apples = Arc::new(PrefixStore::new(InMemory::new(), "apples")) as 
Arc<dyn ObjectStore>;
+        assert!(registry.register(apples_url, Arc::clone(&apples)).is_none());
+
+        // Should still resolve to banana store
+        let (resolved, path) = registry.resolve(&banana_url).unwrap();
+        assert_eq!(path.as_ref(), "");
+        assert!(Arc::ptr_eq(&resolved, &banana));
+
+        // Should be path segment based
+        let banana_muffins_url = 
Url::parse("memory:///banana_muffins").unwrap();
+        let (resolved, path) = registry.resolve(&banana_muffins_url).unwrap();
+        assert_eq!(path.as_ref(), "banana_muffins");
+        assert!(Arc::ptr_eq(&resolved, &root));
+
+        // Should resolve to root even though path contains prefix of valid 
store
+        let to_resolve = Url::parse("memory:///foo/banana").unwrap();
+        let (resolved, path) = registry.resolve(&to_resolve).unwrap();
+        assert_eq!(path.as_ref(), "foo/banana");
+        assert!(Arc::ptr_eq(&resolved, &root));
+
+        let nested_url = Url::parse("memory:///apples/bananas").unwrap();
+        let nested =
+            Arc::new(PrefixStore::new(InMemory::new(), "apples/bananas")) as 
Arc<dyn ObjectStore>;
+        assert!(registry.register(nested_url, Arc::clone(&nested)).is_none());
+
+        let to_resolve = 
Url::parse("memory:///apples/bananas/muffins/cupcakes").unwrap();
+        let (resolved, path) = registry.resolve(&to_resolve).unwrap();
+        assert_eq!(path.as_ref(), "muffins/cupcakes");
+        assert!(Arc::ptr_eq(&resolved, &nested));
+
+        let nested_url2 = Url::parse("memory:///1/2/3").unwrap();
+        let nested2 = Arc::new(PrefixStore::new(InMemory::new(), "1/2/3")) as 
Arc<dyn ObjectStore>;
+        assert!(registry
+            .register(nested_url2, Arc::clone(&nested2))
+            .is_none());
+
+        let to_resolve = Url::parse("memory:///1/2/3/4/5/6").unwrap();
+        let (resolved, path) = registry.resolve(&to_resolve).unwrap();
+        assert_eq!(path.as_ref(), "4/5/6");
+        assert!(Arc::ptr_eq(&resolved, &nested2));
+
+        let custom_scheme_url = Url::parse("custom:///").unwrap();
+        let custom_scheme = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+        assert!(registry
+            .register(custom_scheme_url, Arc::clone(&custom_scheme))
+            .is_none());
+
+        let to_resolve = Url::parse("custom:///6/7").unwrap();
+        let (resolved, path) = registry.resolve(&to_resolve).unwrap();
+        assert_eq!(path.as_ref(), "6/7");
+        assert!(Arc::ptr_eq(&resolved, &custom_scheme));
+    }
+}

Reply via email to