tustvold commented on code in PR #5543:
URL: https://github.com/apache/arrow-datafusion/pull/5543#discussion_r1143247554


##########
datafusion/execution/src/object_store.rs:
##########
@@ -156,78 +158,64 @@ impl std::fmt::Debug for ObjectStoreRegistry {
     }
 }
 
-impl Default for ObjectStoreRegistry {
+impl Default for DefaultObjectStoreRegistry {
     fn default() -> Self {
         Self::new()
     }
 }
 
-impl ObjectStoreRegistry {
-    /// Create an [`ObjectStoreRegistry`] with no [`ObjectStoreProvider`].
-    ///
-    /// This will register [`LocalFileSystem`] to handle `file://` paths, 
further stores
-    /// will need to be explicitly registered with calls to 
[`ObjectStoreRegistry::register_store`]
+impl DefaultObjectStoreRegistry {
+    /// This will register [`LocalFileSystem`] to handle `file://` paths
     pub fn new() -> Self {
-        ObjectStoreRegistry::new_with_provider(None)
-    }
-
-    /// Create an [`ObjectStoreRegistry`] with the provided 
[`ObjectStoreProvider`]
-    ///
-    /// This will register [`LocalFileSystem`] to handle `file://` paths, 
further stores
-    /// may be explicity registered with calls to 
[`ObjectStoreRegistry::register_store`] or
-    /// created lazily, on-demand by the provided [`ObjectStoreProvider`]
-    pub fn new_with_provider(provider: Option<Arc<dyn ObjectStoreProvider>>) 
-> Self {
         let object_stores: DashMap<String, Arc<dyn ObjectStore>> = 
DashMap::new();
-        object_stores.insert("file://".to_string(), 
Arc::new(LocalFileSystem::new()));
-        Self {
-            object_stores,
-            provider,
-        }
+        let url = Url::parse("file://").unwrap();
+        let key = get_url_key(&url);
+        object_stores.insert(key, Arc::new(LocalFileSystem::new()));
+        Self { object_stores }
     }
+}
 
-    /// Adds a new store to this registry.
-    ///
-    /// If a store with the same schema and host existed before, it is 
replaced and returned
-    pub fn register_store(
+impl ObjectStoreRegistry for DefaultObjectStoreRegistry {
+    fn register_store(
         &self,
-        scheme: impl AsRef<str>,
-        host: impl AsRef<str>,
+        url: &Url,
         store: Arc<dyn ObjectStore>,
     ) -> Option<Arc<dyn ObjectStore>> {
-        let s = format!("{}://{}", scheme.as_ref(), host.as_ref());
+        let s = get_url_key(url);
         self.object_stores.insert(s, store)
     }
 
-    /// Get a suitable store for the provided URL. For example:
-    ///
-    /// - URL with scheme `file:///` or no schema will return the default 
LocalFS store
-    /// - URL with scheme `s3://bucket/` will return the S3 store if it's 
registered
-    /// - URL with scheme `hdfs://hostname:port/` will return the hdfs store 
if it's registered
-    ///
-    pub fn get_by_url(&self, url: impl AsRef<Url>) -> Result<Arc<dyn 
ObjectStore>> {
-        let url = url.as_ref();
-        // First check whether can get object store from registry
-        let s = &url[url::Position::BeforeScheme..url::Position::BeforePath];
-        let store = self.object_stores.get(s).map(|o| o.value().clone());
-
-        match store {
-            Some(store) => Ok(store),
-            None => match &self.provider {
-                Some(provider) => {
-                    let store = provider.get_by_url(url)?;
-                    let key =
-                        
&url[url::Position::BeforeScheme..url::Position::BeforePath];
-                    self.object_stores.insert(key.to_owned(), store.clone());
-                    Ok(store)
-                }
-                None => Err(DataFusionError::Internal(format!(
+    /// The [`DefaultObjectStoreRegistry`] will only depend on the inner 
object store cache
+    /// to decide whether it's able to find an [`ObjectStore`] for a url. No 
ad-hoc discovery
+    /// and lazy registration will be executed.
+    fn get_or_lazy_register_store(&self, url: &Url) -> Result<Arc<dyn 
ObjectStore>> {
+        let s = get_url_key(url);
+        self.object_stores
+            .get(&s)
+            .map(|o| o.value().clone())
+            .ok_or_else(|| {
+                DataFusionError::Internal(format!(
                     "No suitable object store found for {url}"
-                ))),
-            },
-        }
+                ))
+            })
     }
 }
 
+/// Get the key of a url for object store registration.
+/// The credential info will be removed
+fn get_url_key(url: &Url) -> String {
+    let port_info = url
+        .port()
+        .map(|port| format!(":{port}"))
+        .unwrap_or(String::new());
+    format!(
+        "{}://{}{}/",
+        &url[url::Position::BeforeScheme..url::Position::AfterScheme],
+        &url[url::Position::BeforeHost..url::Position::AfterHost],
+        port_info
+    )

Review Comment:
   ```suggestion
       format!(
           "{}://{}{}",
           url->scheme(),
           &url[url::Position::BeforeHost..url::Position::AfterPort],
       )
   ```
   
   Note: this also removes the trailing `/` which I _think_ isn't correct



##########
datafusion/execution/src/object_store.rs:
##########
@@ -132,18 +115,37 @@ pub trait ObjectStoreProvider: Send + Sync + 'static {
 /// buckets using [`ObjectStoreRegistry::register_store`]
 ///
 /// 2. Systems relying on ad-hoc discovery, without corresponding DDL, can 
create [`ObjectStore`]
-/// lazily, on-demand using [`ObjectStoreProvider`]
+/// lazily
 ///
 /// [`ListingTableUrl`]: crate::datasource::listing::ListingTableUrl
-pub struct ObjectStoreRegistry {
+pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static {
+    /// If a store with the same key existed before, it is replaced and 
returned
+    fn register_store(
+        &self,
+        url: &Url,
+        store: Arc<dyn ObjectStore>,
+    ) -> Option<Arc<dyn ObjectStore>>;
+
+    /// Get a suitable store for the provided URL. For example:
+    ///
+    /// - URL with scheme `file:///` or no schema will return the default 
LocalFS store
+    /// - URL with scheme `s3://bucket/` will return the S3 store
+    /// - URL with scheme `hdfs://hostname:port/` will return the hdfs store
+    ///
+    /// If no [`ObjectStore`] found for the `url`, ad-hoc discovery may be 
executed depending on
+    /// the `url`. An [`ObjectStore`] may be lazily created and registered.

Review Comment:
   ```suggestion
       /// If no [`ObjectStore`] found for the `url`, ad-hoc discovery may be 
executed depending on
       /// the `url` and [`ObjectStoreRegistry`] implementation
   ```



##########
datafusion/execution/src/object_store.rs:
##########
@@ -156,78 +158,64 @@ impl std::fmt::Debug for ObjectStoreRegistry {
     }
 }
 
-impl Default for ObjectStoreRegistry {
+impl Default for DefaultObjectStoreRegistry {
     fn default() -> Self {
         Self::new()
     }
 }
 
-impl ObjectStoreRegistry {
-    /// Create an [`ObjectStoreRegistry`] with no [`ObjectStoreProvider`].
-    ///
-    /// This will register [`LocalFileSystem`] to handle `file://` paths, 
further stores
-    /// will need to be explicitly registered with calls to 
[`ObjectStoreRegistry::register_store`]
+impl DefaultObjectStoreRegistry {
+    /// This will register [`LocalFileSystem`] to handle `file://` paths
     pub fn new() -> Self {
-        ObjectStoreRegistry::new_with_provider(None)
-    }
-
-    /// Create an [`ObjectStoreRegistry`] with the provided 
[`ObjectStoreProvider`]
-    ///
-    /// This will register [`LocalFileSystem`] to handle `file://` paths, 
further stores
-    /// may be explicity registered with calls to 
[`ObjectStoreRegistry::register_store`] or
-    /// created lazily, on-demand by the provided [`ObjectStoreProvider`]
-    pub fn new_with_provider(provider: Option<Arc<dyn ObjectStoreProvider>>) 
-> Self {
         let object_stores: DashMap<String, Arc<dyn ObjectStore>> = 
DashMap::new();
-        object_stores.insert("file://".to_string(), 
Arc::new(LocalFileSystem::new()));
-        Self {
-            object_stores,
-            provider,
-        }
+        let url = Url::parse("file://").unwrap();
+        let key = get_url_key(&url);
+        object_stores.insert(key, Arc::new(LocalFileSystem::new()));

Review Comment:
   ```suggestion
           object_stores.insert("file://".to_string(), 
Arc::new(LocalFileSystem::new()));
   ```
   
   See below comment about removing trailing `/`



##########
datafusion/execution/src/object_store.rs:
##########
@@ -132,18 +115,37 @@ pub trait ObjectStoreProvider: Send + Sync + 'static {
 /// buckets using [`ObjectStoreRegistry::register_store`]
 ///
 /// 2. Systems relying on ad-hoc discovery, without corresponding DDL, can 
create [`ObjectStore`]
-/// lazily, on-demand using [`ObjectStoreProvider`]
+/// lazily

Review Comment:
   ```suggestion
   /// lazily by providing a custom implementation of [`ObjectStoreRegistry`]
   ```



##########
datafusion-cli/src/object_storage.rs:
##########
@@ -43,28 +46,52 @@ impl FromStr for ObjectStoreScheme {
     }
 }
 
-#[derive(Debug)]
-pub struct DatafusionCliObjectStoreProvider {}
+#[derive(Debug, Default)]

Review Comment:
   ```suggestion
   /// An [`ObjectStoreRegistry`] that can automatically create S3 and GCS 
stores for a given URL
   #[derive(Debug, Default)]
   ```



##########
datafusion/execution/src/object_store.rs:
##########
@@ -132,18 +115,37 @@ pub trait ObjectStoreProvider: Send + Sync + 'static {
 /// buckets using [`ObjectStoreRegistry::register_store`]
 ///
 /// 2. Systems relying on ad-hoc discovery, without corresponding DDL, can 
create [`ObjectStore`]
-/// lazily, on-demand using [`ObjectStoreProvider`]
+/// lazily
 ///
 /// [`ListingTableUrl`]: crate::datasource::listing::ListingTableUrl
-pub struct ObjectStoreRegistry {
+pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static {
+    /// If a store with the same key existed before, it is replaced and 
returned
+    fn register_store(
+        &self,
+        url: &Url,
+        store: Arc<dyn ObjectStore>,
+    ) -> Option<Arc<dyn ObjectStore>>;
+
+    /// Get a suitable store for the provided URL. For example:
+    ///
+    /// - URL with scheme `file:///` or no schema will return the default 
LocalFS store
+    /// - URL with scheme `s3://bucket/` will return the S3 store
+    /// - URL with scheme `hdfs://hostname:port/` will return the hdfs store

Review Comment:
   ```suggestion
       /// Get a suitable store for the provided URL
   ```
   
   These are outdated now



##########
datafusion/execution/src/object_store.rs:
##########
@@ -156,78 +158,64 @@ impl std::fmt::Debug for ObjectStoreRegistry {
     }
 }
 
-impl Default for ObjectStoreRegistry {
+impl Default for DefaultObjectStoreRegistry {
     fn default() -> Self {
         Self::new()
     }
 }
 
-impl ObjectStoreRegistry {
-    /// Create an [`ObjectStoreRegistry`] with no [`ObjectStoreProvider`].
-    ///
-    /// This will register [`LocalFileSystem`] to handle `file://` paths, 
further stores
-    /// will need to be explicitly registered with calls to 
[`ObjectStoreRegistry::register_store`]
+impl DefaultObjectStoreRegistry {
+    /// This will register [`LocalFileSystem`] to handle `file://` paths
     pub fn new() -> Self {
-        ObjectStoreRegistry::new_with_provider(None)
-    }
-
-    /// Create an [`ObjectStoreRegistry`] with the provided 
[`ObjectStoreProvider`]
-    ///
-    /// This will register [`LocalFileSystem`] to handle `file://` paths, 
further stores
-    /// may be explicity registered with calls to 
[`ObjectStoreRegistry::register_store`] or
-    /// created lazily, on-demand by the provided [`ObjectStoreProvider`]
-    pub fn new_with_provider(provider: Option<Arc<dyn ObjectStoreProvider>>) 
-> Self {
         let object_stores: DashMap<String, Arc<dyn ObjectStore>> = 
DashMap::new();
-        object_stores.insert("file://".to_string(), 
Arc::new(LocalFileSystem::new()));
-        Self {
-            object_stores,
-            provider,
-        }
+        let url = Url::parse("file://").unwrap();
+        let key = get_url_key(&url);
+        object_stores.insert(key, Arc::new(LocalFileSystem::new()));
+        Self { object_stores }
     }
+}
 
-    /// Adds a new store to this registry.
-    ///
-    /// If a store with the same schema and host existed before, it is 
replaced and returned
-    pub fn register_store(
+impl ObjectStoreRegistry for DefaultObjectStoreRegistry {
+    fn register_store(
         &self,
-        scheme: impl AsRef<str>,
-        host: impl AsRef<str>,
+        url: &Url,
         store: Arc<dyn ObjectStore>,
     ) -> Option<Arc<dyn ObjectStore>> {
-        let s = format!("{}://{}", scheme.as_ref(), host.as_ref());
+        let s = get_url_key(url);
         self.object_stores.insert(s, store)
     }
 
-    /// Get a suitable store for the provided URL. For example:
-    ///
-    /// - URL with scheme `file:///` or no schema will return the default 
LocalFS store
-    /// - URL with scheme `s3://bucket/` will return the S3 store if it's 
registered
-    /// - URL with scheme `hdfs://hostname:port/` will return the hdfs store 
if it's registered
-    ///
-    pub fn get_by_url(&self, url: impl AsRef<Url>) -> Result<Arc<dyn 
ObjectStore>> {
-        let url = url.as_ref();
-        // First check whether can get object store from registry
-        let s = &url[url::Position::BeforeScheme..url::Position::BeforePath];
-        let store = self.object_stores.get(s).map(|o| o.value().clone());
-
-        match store {
-            Some(store) => Ok(store),
-            None => match &self.provider {
-                Some(provider) => {
-                    let store = provider.get_by_url(url)?;
-                    let key =
-                        
&url[url::Position::BeforeScheme..url::Position::BeforePath];
-                    self.object_stores.insert(key.to_owned(), store.clone());
-                    Ok(store)
-                }
-                None => Err(DataFusionError::Internal(format!(
+    /// The [`DefaultObjectStoreRegistry`] will only depend on the inner 
object store cache
+    /// to decide whether it's able to find an [`ObjectStore`] for a url. No 
ad-hoc discovery
+    /// and lazy registration will be executed.

Review Comment:
   ```suggestion
   ```
   
   These docs aren't rendered by docs.rs, I moved this onto the type definition 
and reworded it slightly



##########
datafusion-cli/src/object_storage.rs:
##########
@@ -43,28 +46,52 @@ impl FromStr for ObjectStoreScheme {
     }
 }
 
-#[derive(Debug)]
-pub struct DatafusionCliObjectStoreProvider {}
+#[derive(Debug, Default)]
+pub struct DatafusionCliObjectStoreRegistry {
+    inner: DefaultObjectStoreRegistry,
+}
+
+impl DatafusionCliObjectStoreRegistry {
+    pub fn new() -> Self {
+        Default::default()
+    }
+}
+
+impl ObjectStoreRegistry for DatafusionCliObjectStoreRegistry {
+    fn register_store(
+        &self,
+        url: &Url,
+        store: Arc<dyn ObjectStore>,
+    ) -> Option<Arc<dyn ObjectStore>> {
+        self.inner.register_store(url, store)
+    }
+
+    fn get_or_lazy_register_store(&self, url: &Url) -> Result<Arc<dyn 
ObjectStore>> {

Review Comment:
   ```suggestion
       fn get_store(&self, url: &Url) -> Result<Arc<dyn ObjectStore>> {
   ```
   
   The lazy creation is an implementation detail of certain 
`ObjectStoreRegistry`, I'm not sure it needs to be encoded in the name



##########
datafusion/execution/src/object_store.rs:
##########
@@ -132,18 +115,37 @@ pub trait ObjectStoreProvider: Send + Sync + 'static {
 /// buckets using [`ObjectStoreRegistry::register_store`]
 ///
 /// 2. Systems relying on ad-hoc discovery, without corresponding DDL, can 
create [`ObjectStore`]
-/// lazily, on-demand using [`ObjectStoreProvider`]
+/// lazily
 ///
 /// [`ListingTableUrl`]: crate::datasource::listing::ListingTableUrl
-pub struct ObjectStoreRegistry {
+pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static {
+    /// If a store with the same key existed before, it is replaced and 
returned
+    fn register_store(
+        &self,
+        url: &Url,
+        store: Arc<dyn ObjectStore>,
+    ) -> Option<Arc<dyn ObjectStore>>;
+
+    /// Get a suitable store for the provided URL. For example:
+    ///
+    /// - URL with scheme `file:///` or no schema will return the default 
LocalFS store
+    /// - URL with scheme `s3://bucket/` will return the S3 store
+    /// - URL with scheme `hdfs://hostname:port/` will return the hdfs store
+    ///
+    /// If no [`ObjectStore`] found for the `url`, ad-hoc discovery may be 
executed depending on
+    /// the `url`. An [`ObjectStore`] may be lazily created and registered.
+    fn get_or_lazy_register_store(&self, url: &Url) -> Result<Arc<dyn 
ObjectStore>>;
+}
+
+/// The default [`ObjectStoreRegistry`]
+pub struct DefaultObjectStoreRegistry {

Review Comment:
   ```suggestion
   ///
   /// Stores are registered based on the scheme, host and port of the provided 
URL
   /// with a [`LocalFileSystem::new`] automatically registered for `file://`
   ///
   /// For example:
   ///
   /// - `file:///my_path` will return the default LocalFS store
   /// - `s3://bucket/path` will return a store registered with `s3://bucket` 
if any
   /// - `hdfs://host:port/path` will return a store registered with 
`hdfs://host:port` if any
   pub struct DefaultObjectStoreRegistry {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to