This is an automated email from the ASF dual-hosted git repository.

nju_yaho pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 3a8b5dae14 Change ObjectStoreRegistry from struct to trait to provide 
polymorphism (#5543)
3a8b5dae14 is described below

commit 3a8b5dae14aac21712281d1ff4e9737cbfd35813
Author: yahoNanJing <[email protected]>
AuthorDate: Wed Mar 22 11:39:08 2023 +0800

    Change ObjectStoreRegistry from struct to trait to provide polymorphism 
(#5543)
    
    * Introduce ObjectStoreManager trait for the ObjectStoreRegistry to provide 
polymorphism for get_by_url
    
    * Change ObjectStoreRegistry from struct to trait
    
    * Fix for review comments
    
    * Minor fix
    
    * Use url instead of &str as the parameter for register_store
    
    * Remove register_with_scheme_and_host
    
    * Minor fix
    
    * Fix for review comments
    
    ---------
    
    Co-authored-by: yangzhong <[email protected]>
---
 datafusion-cli/src/main.rs                         |   7 +-
 datafusion-cli/src/object_storage.rs               |  90 ++++++++----
 datafusion-examples/Cargo.toml                     |   1 +
 datafusion-examples/examples/query-aws-s3.rs       |   5 +-
 datafusion/core/src/datasource/listing/mod.rs      |  39 ++---
 .../core/src/physical_plan/file_format/avro.rs     |   4 +-
 .../core/src/physical_plan/file_format/csv.rs      |   8 +-
 .../core/src/physical_plan/file_format/json.rs     |   4 +-
 datafusion/core/src/test/object_store.rs           |   4 +-
 datafusion/core/tests/path_partition.rs            |   9 +-
 datafusion/execution/src/object_store.rs           | 159 +++++++++++----------
 datafusion/execution/src/runtime_env.rs            |  39 ++---
 12 files changed, 210 insertions(+), 159 deletions(-)

diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs
index 6d7b29837c..8061f4d516 100644
--- a/datafusion-cli/src/main.rs
+++ b/datafusion-cli/src/main.rs
@@ -16,13 +16,12 @@
 // under the License.
 
 use clap::Parser;
-use datafusion::datasource::object_store::ObjectStoreRegistry;
 use datafusion::error::{DataFusionError, Result};
 use datafusion::execution::context::SessionConfig;
 use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
 use datafusion::prelude::SessionContext;
 use datafusion_cli::catalog::DynamicFileCatalog;
-use datafusion_cli::object_storage::DatafusionCliObjectStoreProvider;
+use datafusion_cli::object_storage::DatafusionCliObjectStoreRegistry;
 use datafusion_cli::{
     exec, print_format::PrintFormat, print_options::PrintOptions, 
DATAFUSION_CLI_VERSION,
 };
@@ -149,9 +148,7 @@ pub async fn main() -> Result<()> {
 }
 
 fn create_runtime_env() -> Result<RuntimeEnv> {
-    let object_store_provider = DatafusionCliObjectStoreProvider {};
-    let object_store_registry =
-        
ObjectStoreRegistry::new_with_provider(Some(Arc::new(object_store_provider)));
+    let object_store_registry = DatafusionCliObjectStoreRegistry::new();
     let rn_config =
         
RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry));
     RuntimeEnv::new(rn_config)
diff --git a/datafusion-cli/src/object_storage.rs 
b/datafusion-cli/src/object_storage.rs
index 56401c42f5..46a42c927b 100644
--- a/datafusion-cli/src/object_storage.rs
+++ b/datafusion-cli/src/object_storage.rs
@@ -18,8 +18,11 @@
 use datafusion::error::Result;
 use std::{env, str::FromStr, sync::Arc};
 
-use datafusion::{datasource::object_store::ObjectStoreProvider, 
error::DataFusionError};
-use object_store::{aws::AmazonS3Builder, gcp::GoogleCloudStorageBuilder};
+use datafusion::datasource::object_store::{
+    DefaultObjectStoreRegistry, ObjectStoreRegistry,
+};
+use datafusion::error::DataFusionError;
+use object_store::{aws::AmazonS3Builder, gcp::GoogleCloudStorageBuilder, 
ObjectStore};
 use url::Url;
 
 #[derive(Debug, PartialEq, Eq, clap::ArgEnum, Clone)]
@@ -43,20 +46,45 @@ impl FromStr for ObjectStoreScheme {
     }
 }
 
-#[derive(Debug)]
-pub struct DatafusionCliObjectStoreProvider {}
+/// An [`ObjectStoreRegistry`] that can automatically create S3 and GCS stores 
for a given URL
+#[derive(Debug, Default)]
+pub struct DatafusionCliObjectStoreRegistry {
+    inner: DefaultObjectStoreRegistry,
+}
+
+impl DatafusionCliObjectStoreRegistry {
+    pub fn new() -> Self {
+        Default::default()
+    }
+}
+
+impl ObjectStoreRegistry for DatafusionCliObjectStoreRegistry {
+    fn register_store(
+        &self,
+        url: &Url,
+        store: Arc<dyn ObjectStore>,
+    ) -> Option<Arc<dyn ObjectStore>> {
+        self.inner.register_store(url, store)
+    }
+
+    fn get_store(&self, url: &Url) -> Result<Arc<dyn ObjectStore>> {
+        self.inner.get_store(url).or_else(|_| {
+            let store =
+                ObjectStoreScheme::from_str(url.scheme()).map(
+                    |scheme| match scheme {
+                        ObjectStoreScheme::S3 => build_s3_object_store(url),
+                        ObjectStoreScheme::GCS => build_gcs_object_store(url),
+                    },
+                )??;
+
+            self.inner.register_store(url, store.clone());
 
-/// ObjectStoreProvider for S3 and GCS
-impl ObjectStoreProvider for DatafusionCliObjectStoreProvider {
-    fn get_by_url(&self, url: &Url) -> Result<Arc<dyn 
object_store::ObjectStore>> {
-        ObjectStoreScheme::from_str(url.scheme()).map(|scheme| match scheme {
-            ObjectStoreScheme::S3 => build_s3_object_store(url),
-            ObjectStoreScheme::GCS => build_gcs_object_store(url),
-        })?
+            Ok(store)
+        })
     }
 }
 
-fn build_s3_object_store(url: &Url) -> Result<Arc<dyn 
object_store::ObjectStore>> {
+fn build_s3_object_store(url: &Url) -> Result<Arc<dyn ObjectStore>> {
     let host = get_host_name(url)?;
     match AmazonS3Builder::from_env().with_bucket_name(host).build() {
         Ok(s3) => Ok(Arc::new(s3)),
@@ -64,7 +92,7 @@ fn build_s3_object_store(url: &Url) -> Result<Arc<dyn 
object_store::ObjectStore>
     }
 }
 
-fn build_gcs_object_store(url: &Url) -> Result<Arc<dyn 
object_store::ObjectStore>> {
+fn build_gcs_object_store(url: &Url) -> Result<Arc<dyn ObjectStore>> {
     let host = get_host_name(url)?;
     let mut builder = GoogleCloudStorageBuilder::new().with_bucket_name(host);
 
@@ -90,17 +118,17 @@ fn get_host_name(url: &Url) -> Result<&str> {
 mod tests {
     use std::{env, str::FromStr};
 
-    use datafusion::datasource::object_store::ObjectStoreProvider;
+    use datafusion::datasource::object_store::ObjectStoreRegistry;
     use url::Url;
 
-    use super::DatafusionCliObjectStoreProvider;
+    use super::DatafusionCliObjectStoreRegistry;
 
     #[test]
     fn s3_provider_no_host() {
         let no_host_url = "s3:///";
-        let provider = DatafusionCliObjectStoreProvider {};
-        let err = provider
-            .get_by_url(&Url::from_str(no_host_url).unwrap())
+        let registry = DatafusionCliObjectStoreRegistry::new();
+        let err = registry
+            .get_store(&Url::from_str(no_host_url).unwrap())
             .unwrap_err();
         assert!(err
             .to_string()
@@ -110,9 +138,9 @@ mod tests {
     #[test]
     fn gs_provider_no_host() {
         let no_host_url = "gs:///";
-        let provider = DatafusionCliObjectStoreProvider {};
-        let err = provider
-            .get_by_url(&Url::from_str(no_host_url).unwrap())
+        let registry = DatafusionCliObjectStoreRegistry::new();
+        let err = registry
+            .get_store(&Url::from_str(no_host_url).unwrap())
             .unwrap_err();
         assert!(err
             .to_string()
@@ -122,9 +150,9 @@ mod tests {
     #[test]
     fn gcs_provider_no_host() {
         let no_host_url = "gcs:///";
-        let provider = DatafusionCliObjectStoreProvider {};
-        let err = provider
-            .get_by_url(&Url::from_str(no_host_url).unwrap())
+        let registry = DatafusionCliObjectStoreRegistry::new();
+        let err = registry
+            .get_store(&Url::from_str(no_host_url).unwrap())
             .unwrap_err();
         assert!(err
             .to_string()
@@ -134,9 +162,9 @@ mod tests {
     #[test]
     fn unknown_object_store_type() {
         let unknown = "unknown://bucket_name/path";
-        let provider = DatafusionCliObjectStoreProvider {};
-        let err = provider
-            .get_by_url(&Url::from_str(unknown).unwrap())
+        let registry = DatafusionCliObjectStoreRegistry::new();
+        let err = registry
+            .get_store(&Url::from_str(unknown).unwrap())
             .unwrap_err();
         assert!(err
             .to_string()
@@ -146,15 +174,15 @@ mod tests {
     #[test]
     fn s3_region_validation() {
         let s3 = "s3://bucket_name/path";
-        let provider = DatafusionCliObjectStoreProvider {};
-        let err = provider
-            .get_by_url(&Url::from_str(s3).unwrap())
+        let registry = DatafusionCliObjectStoreRegistry::new();
+        let err = registry
+            .get_store(&Url::from_str(s3).unwrap())
             .unwrap_err();
         assert!(err.to_string().contains("Generic S3 error: Missing region"));
 
         env::set_var("AWS_REGION", "us-east-1");
         let url = Url::from_str(s3).expect("Unable to parse s3 url");
-        let res = provider.get_by_url(&url);
+        let res = registry.get_store(&url);
         let msg = match res {
             Err(e) => format!("{e}"),
             Ok(_) => "".to_string(),
diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml
index 613d3e934a..6e5c62ce82 100644
--- a/datafusion-examples/Cargo.toml
+++ b/datafusion-examples/Cargo.toml
@@ -57,4 +57,5 @@ serde = { version = "1.0.136", features = ["derive"] }
 serde_json = "1.0.82"
 tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", 
"sync", "parking_lot"] }
 tonic = "0.8"
+url = "2.2"
 uuid = "1.2"
diff --git a/datafusion-examples/examples/query-aws-s3.rs 
b/datafusion-examples/examples/query-aws-s3.rs
index 3788167312..cbb6486b4e 100644
--- a/datafusion-examples/examples/query-aws-s3.rs
+++ b/datafusion-examples/examples/query-aws-s3.rs
@@ -20,6 +20,7 @@ use datafusion::prelude::*;
 use object_store::aws::AmazonS3Builder;
 use std::env;
 use std::sync::Arc;
+use url::Url;
 
 /// This example demonstrates querying data in an S3 bucket.
 ///
@@ -45,8 +46,10 @@ async fn main() -> Result<()> {
         .with_secret_access_key(env::var("AWS_SECRET_ACCESS_KEY").unwrap())
         .build()?;
 
+    let path = format!("s3://{bucket_name}");
+    let s3_url = Url::parse(&path).unwrap();
     ctx.runtime_env()
-        .register_object_store("s3", bucket_name, Arc::new(s3));
+        .register_object_store(&s3_url, Arc::new(s3));
 
     // cannot query the parquet files from this bucket because the path 
contains a whitespace
     // and we don't support that yet
diff --git a/datafusion/core/src/datasource/listing/mod.rs 
b/datafusion/core/src/datasource/listing/mod.rs
index 1e6b40a348..c7a1761151 100644
--- a/datafusion/core/src/datasource/listing/mod.rs
+++ b/datafusion/core/src/datasource/listing/mod.rs
@@ -115,10 +115,13 @@ impl From<ObjectMeta> for PartitionedFile {
 
 #[cfg(test)]
 mod tests {
-    use datafusion_execution::object_store::ObjectStoreRegistry;
+    use crate::datasource::listing::ListingTableUrl;
+    use datafusion_execution::object_store::{
+        DefaultObjectStoreRegistry, ObjectStoreRegistry,
+    };
     use object_store::local::LocalFileSystem;
-
-    use super::*;
+    use std::sync::Arc;
+    use url::Url;
 
     #[test]
     fn test_object_store_listing_url() {
@@ -132,32 +135,34 @@ mod tests {
     }
 
     #[test]
-    fn test_get_by_url_hdfs() {
-        let sut = ObjectStoreRegistry::default();
-        sut.register_store("hdfs", "localhost:8020", 
Arc::new(LocalFileSystem::new()));
+    fn test_get_store_hdfs() {
+        let sut = DefaultObjectStoreRegistry::default();
+        let url = Url::parse("hdfs://localhost:8020").unwrap();
+        sut.register_store(&url, Arc::new(LocalFileSystem::new()));
         let url = ListingTableUrl::parse("hdfs://localhost:8020/key").unwrap();
-        sut.get_by_url(&url).unwrap();
+        sut.get_store(url.as_ref()).unwrap();
     }
 
     #[test]
-    fn test_get_by_url_s3() {
-        let sut = ObjectStoreRegistry::default();
-        sut.register_store("s3", "bucket", Arc::new(LocalFileSystem::new()));
+    fn test_get_store_s3() {
+        let sut = DefaultObjectStoreRegistry::default();
+        let url = Url::parse("s3://bucket/key").unwrap();
+        sut.register_store(&url, Arc::new(LocalFileSystem::new()));
         let url = ListingTableUrl::parse("s3://bucket/key").unwrap();
-        sut.get_by_url(&url).unwrap();
+        sut.get_store(url.as_ref()).unwrap();
     }
 
     #[test]
-    fn test_get_by_url_file() {
-        let sut = ObjectStoreRegistry::default();
+    fn test_get_store_file() {
+        let sut = DefaultObjectStoreRegistry::default();
         let url = ListingTableUrl::parse("file:///bucket/key").unwrap();
-        sut.get_by_url(&url).unwrap();
+        sut.get_store(url.as_ref()).unwrap();
     }
 
     #[test]
-    fn test_get_by_url_local() {
-        let sut = ObjectStoreRegistry::default();
+    fn test_get_store_local() {
+        let sut = DefaultObjectStoreRegistry::default();
         let url = ListingTableUrl::parse("../").unwrap();
-        sut.get_by_url(&url).unwrap();
+        sut.get_store(url.as_ref()).unwrap();
     }
 }
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs 
b/datafusion/core/src/physical_plan/file_format/avro.rs
index e4fc570feb..ed27dfac03 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -221,6 +221,7 @@ mod tests {
     use object_store::local::LocalFileSystem;
     use object_store::ObjectStore;
     use rstest::*;
+    use url::Url;
 
     use super::*;
 
@@ -245,9 +246,10 @@ mod tests {
         let session_ctx = SessionContext::new();
         let state = session_ctx.state();
 
+        let url = Url::parse("file://").unwrap();
         state
             .runtime_env()
-            .register_object_store("file", "", store.clone());
+            .register_object_store(&url, store.clone());
 
         let testdata = crate::test_util::arrow_test_data();
         let filename = format!("{testdata}/avro/alltypes_plain.avro");
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs 
b/datafusion/core/src/physical_plan/file_format/csv.rs
index 3fc7df4f10..d9075c84ad 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -337,6 +337,7 @@ mod tests {
     use std::fs::File;
     use std::io::Write;
     use tempfile::TempDir;
+    use url::Url;
 
     #[rstest(
         file_compression_type,
@@ -656,8 +657,8 @@ mod tests {
         store: Arc<dyn ObjectStore>,
     ) {
         let ctx = SessionContext::new();
-        ctx.runtime_env()
-            .register_object_store("file", "", store.clone());
+        let url = Url::parse("file://").unwrap();
+        ctx.runtime_env().register_object_store(&url, store.clone());
 
         let task_ctx = ctx.task_ctx();
 
@@ -717,9 +718,10 @@ mod tests {
         let path = object_store::path::Path::from("a.csv");
         store.put(&path, data).await.unwrap();
 
+        let url = Url::parse("memory://").unwrap();
         session_ctx
             .runtime_env()
-            .register_object_store("memory", "", Arc::new(store));
+            .register_object_store(&url, Arc::new(store));
 
         let df = session_ctx
             .read_csv("memory:///", CsvReadOptions::new())
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs 
b/datafusion/core/src/physical_plan/file_format/json.rs
index 146227335c..ebbae74178 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -331,8 +331,8 @@ mod tests {
         store: Arc<dyn ObjectStore>,
     ) {
         let ctx = SessionContext::new();
-        ctx.runtime_env()
-            .register_object_store("file", "", store.clone());
+        let url = Url::parse("file://").unwrap();
+        ctx.runtime_env().register_object_store(&url, store.clone());
         let filename = "1.json";
         let file_groups = partitioned_file_groups(
             TEST_DATA_BASE,
diff --git a/datafusion/core/src/test/object_store.rs 
b/datafusion/core/src/test/object_store.rs
index e61653e882..3e2d1fdae0 100644
--- a/datafusion/core/src/test/object_store.rs
+++ b/datafusion/core/src/test/object_store.rs
@@ -19,11 +19,13 @@ use crate::prelude::SessionContext;
 use futures::FutureExt;
 use object_store::{memory::InMemory, path::Path, ObjectMeta, ObjectStore};
 use std::sync::Arc;
+use url::Url;
 
 /// Returns a test object store with the provided `ctx`
 pub fn register_test_store(ctx: &SessionContext, files: &[(&str, u64)]) {
+    let url = Url::parse("test://").unwrap();
     ctx.runtime_env()
-        .register_object_store("test", "", make_test_store(files));
+        .register_object_store(&url, make_test_store(files));
 }
 
 /// Create a test object store with the provided files
diff --git a/datafusion/core/tests/path_partition.rs 
b/datafusion/core/tests/path_partition.rs
index 6c7f1431af..b93ad02aa2 100644
--- a/datafusion/core/tests/path_partition.rs
+++ b/datafusion/core/tests/path_partition.rs
@@ -45,6 +45,7 @@ use object_store::{
     path::Path, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
 };
 use tokio::io::AsyncWrite;
+use url::Url;
 
 #[tokio::test]
 async fn parquet_distinct_partition_col() -> Result<()> {
@@ -523,9 +524,9 @@ fn register_partitioned_aggregate_csv(
     let testdata = arrow_test_data();
     let csv_file_path = format!("{testdata}/csv/aggregate_test_100.csv");
     let file_schema = test_util::aggr_test_schema();
+    let url = Url::parse("mirror://").unwrap();
     ctx.runtime_env().register_object_store(
-        "mirror",
-        "",
+        &url,
         MirroringObjectStore::new_arc(csv_file_path, store_paths),
     );
 
@@ -556,9 +557,9 @@ async fn register_partitioned_alltypes_parquet(
 ) {
     let testdata = parquet_test_data();
     let parquet_file_path = format!("{testdata}/{source_file}");
+    let url = Url::parse("mirror://").unwrap();
     ctx.runtime_env().register_object_store(
-        "mirror",
-        "",
+        &url,
         MirroringObjectStore::new_arc(parquet_file_path.clone(), store_paths),
     );
 
diff --git a/datafusion/execution/src/object_store.rs 
b/datafusion/execution/src/object_store.rs
index 9b958e3023..c22ec34a60 100644
--- a/datafusion/execution/src/object_store.rs
+++ b/datafusion/execution/src/object_store.rs
@@ -79,21 +79,6 @@ impl std::fmt::Display for ObjectStoreUrl {
     }
 }
 
-/// Provides a mechanism for lazy, on-demand creation of an [`ObjectStore`]
-///
-/// For example, to support reading arbitrary buckets from AWS S3
-/// without instantiating an [`ObjectStore`] for each possible bucket
-/// up front, an [`ObjectStoreProvider`] can be used to create the
-/// appropriate [`ObjectStore`] instance on demand.
-///
-/// See [`ObjectStoreRegistry::new_with_provider`]
-pub trait ObjectStoreProvider: Send + Sync + 'static {
-    /// Return an ObjectStore for the provided url, called by 
[`ObjectStoreRegistry::get_by_url`]
-    /// when no matching store has already been registered. The result will be 
cached based
-    /// on its schema and authority. Any error will be returned to the caller
-    fn get_by_url(&self, url: &Url) -> Result<Arc<dyn ObjectStore>>;
-}
-
 /// [`ObjectStoreRegistry`] maps a URL to an [`ObjectStore`] instance,
 /// and allows DataFusion to read from different [`ObjectStore`]
 /// instances. For example DataFusion might be configured so that
@@ -113,15 +98,13 @@ pub trait ObjectStoreProvider: Send + Sync + 'static {
 /// ```
 ///
 /// In this particular case, the url `s3://my_bucket/lineitem/` will be 
provided to
-/// [`ObjectStoreRegistry::get_by_url`] and one of three things will happen:
+/// [`ObjectStoreRegistry::get_store`] and one of three things will happen:
 ///
 /// - If an [`ObjectStore`] has been registered with 
[`ObjectStoreRegistry::register_store`] with
-/// scheme `s3` and host `my_bucket`, that [`ObjectStore`] will be returned
+/// `s3://my_bucket`, that [`ObjectStore`] will be returned
 ///
-/// - If an [`ObjectStoreProvider`] has been associated with this 
[`ObjectStoreRegistry`] using
-/// [`ObjectStoreRegistry::new_with_provider`], 
[`ObjectStoreProvider::get_by_url`] will be invoked,
-/// and the returned [`ObjectStore`] registered on this 
[`ObjectStoreRegistry`]. Any error will
-/// be returned to the caller
+/// - If an AWS S3 object store can be ad-hoc discovered by the url 
`s3://my_bucket/lineitem/`, this
+/// object store will be registered with key `s3://my_bucket` and returned.
 ///
 /// - Otherwise an error will be returned, indicating that no suitable 
[`ObjectStore`] could
 /// be found
@@ -132,18 +115,38 @@ pub trait ObjectStoreProvider: Send + Sync + 'static {
 /// buckets using [`ObjectStoreRegistry::register_store`]
 ///
 /// 2. Systems relying on ad-hoc discovery, without corresponding DDL, can 
create [`ObjectStore`]
-/// lazily, on-demand using [`ObjectStoreProvider`]
+/// lazily by providing a custom implementation of [`ObjectStoreRegistry`]
 ///
 /// [`ListingTableUrl`]: crate::datasource::listing::ListingTableUrl
-pub struct ObjectStoreRegistry {
+pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static {
+    /// If a store with the same key existed before, it is replaced and 
returned
+    fn register_store(
+        &self,
+        url: &Url,
+        store: Arc<dyn ObjectStore>,
+    ) -> Option<Arc<dyn ObjectStore>>;
+
+    /// Get a suitable store for the provided URL. For example:
+    ///
+    /// - URL with scheme `file:///` or no scheme will return the default 
LocalFS store
+    /// - URL with scheme `s3://bucket/` will return the S3 store
+    /// - URL with scheme `hdfs://hostname:port/` will return the hdfs store
+    ///
+    /// If no [`ObjectStore`] found for the `url`, ad-hoc discovery may be 
executed depending on
+    /// the `url` and [`ObjectStoreRegistry`] implementation. An 
[`ObjectStore`] may be lazily
+    /// created and registered.
+    fn get_store(&self, url: &Url) -> Result<Arc<dyn ObjectStore>>;
+}
+
+/// The default [`ObjectStoreRegistry`]
+pub struct DefaultObjectStoreRegistry {
     /// A map from scheme to object store that serve list / read operations 
for the store
     object_stores: DashMap<String, Arc<dyn ObjectStore>>,
-    provider: Option<Arc<dyn ObjectStoreProvider>>,
 }
 
-impl std::fmt::Debug for ObjectStoreRegistry {
+impl std::fmt::Debug for DefaultObjectStoreRegistry {
     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
-        f.debug_struct("ObjectStoreRegistry")
+        f.debug_struct("DefaultObjectStoreRegistry")
             .field(
                 "schemes",
                 &self
@@ -156,78 +159,63 @@ impl std::fmt::Debug for ObjectStoreRegistry {
     }
 }
 
-impl Default for ObjectStoreRegistry {
+impl Default for DefaultObjectStoreRegistry {
     fn default() -> Self {
         Self::new()
     }
 }
 
-impl ObjectStoreRegistry {
-    /// Create an [`ObjectStoreRegistry`] with no [`ObjectStoreProvider`].
-    ///
-    /// This will register [`LocalFileSystem`] to handle `file://` paths, 
further stores
-    /// will need to be explicitly registered with calls to 
[`ObjectStoreRegistry::register_store`]
+impl DefaultObjectStoreRegistry {
+    /// This will register [`LocalFileSystem`] to handle `file://` paths
     pub fn new() -> Self {
-        ObjectStoreRegistry::new_with_provider(None)
-    }
-
-    /// Create an [`ObjectStoreRegistry`] with the provided 
[`ObjectStoreProvider`]
-    ///
-    /// This will register [`LocalFileSystem`] to handle `file://` paths, 
further stores
-    /// may be explicity registered with calls to 
[`ObjectStoreRegistry::register_store`] or
-    /// created lazily, on-demand by the provided [`ObjectStoreProvider`]
-    pub fn new_with_provider(provider: Option<Arc<dyn ObjectStoreProvider>>) 
-> Self {
         let object_stores: DashMap<String, Arc<dyn ObjectStore>> = 
DashMap::new();
         object_stores.insert("file://".to_string(), 
Arc::new(LocalFileSystem::new()));
-        Self {
-            object_stores,
-            provider,
-        }
+        Self { object_stores }
     }
+}
 
-    /// Adds a new store to this registry.
-    ///
-    /// If a store with the same schema and host existed before, it is 
replaced and returned
-    pub fn register_store(
+///
+/// Stores are registered based on the scheme, host and port of the provided 
URL
+/// with a [`LocalFileSystem::new`] automatically registered for `file://`
+///
+/// For example:
+///
+/// - `file:///my_path` will return the default LocalFS store
+/// - `s3://bucket/path` will return a store registered with `s3://bucket` if 
any
+/// - `hdfs://host:port/path` will return a store registered with 
`hdfs://host:port` if any
+impl ObjectStoreRegistry for DefaultObjectStoreRegistry {
+    fn register_store(
         &self,
-        scheme: impl AsRef<str>,
-        host: impl AsRef<str>,
+        url: &Url,
         store: Arc<dyn ObjectStore>,
     ) -> Option<Arc<dyn ObjectStore>> {
-        let s = format!("{}://{}", scheme.as_ref(), host.as_ref());
+        let s = get_url_key(url);
         self.object_stores.insert(s, store)
     }
 
-    /// Get a suitable store for the provided URL. For example:
-    ///
-    /// - URL with scheme `file:///` or no schema will return the default 
LocalFS store
-    /// - URL with scheme `s3://bucket/` will return the S3 store if it's 
registered
-    /// - URL with scheme `hdfs://hostname:port/` will return the hdfs store 
if it's registered
-    ///
-    pub fn get_by_url(&self, url: impl AsRef<Url>) -> Result<Arc<dyn 
ObjectStore>> {
-        let url = url.as_ref();
-        // First check whether can get object store from registry
-        let s = &url[url::Position::BeforeScheme..url::Position::BeforePath];
-        let store = self.object_stores.get(s).map(|o| o.value().clone());
-
-        match store {
-            Some(store) => Ok(store),
-            None => match &self.provider {
-                Some(provider) => {
-                    let store = provider.get_by_url(url)?;
-                    let key =
-                        
&url[url::Position::BeforeScheme..url::Position::BeforePath];
-                    self.object_stores.insert(key.to_owned(), store.clone());
-                    Ok(store)
-                }
-                None => Err(DataFusionError::Internal(format!(
+    fn get_store(&self, url: &Url) -> Result<Arc<dyn ObjectStore>> {
+        let s = get_url_key(url);
+        self.object_stores
+            .get(&s)
+            .map(|o| o.value().clone())
+            .ok_or_else(|| {
+                DataFusionError::Internal(format!(
                     "No suitable object store found for {url}"
-                ))),
-            },
-        }
+                ))
+            })
     }
 }
 
+/// Get the key of a url for object store registration.
+/// The credential info will be removed
+fn get_url_key(url: &Url) -> String {
+    format!(
+        "{}://{}",
+        url.scheme(),
+        &url[url::Position::BeforeHost..url::Position::AfterPort],
+    )
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -259,4 +247,19 @@ mod tests {
             
ObjectStoreUrl::parse("s3://username:password@host:123/foo").unwrap_err();
         assert_eq!(err.to_string(), "Execution error: ObjectStoreUrl must only 
contain scheme and authority, got: /foo");
     }
+
+    #[test]
+    fn test_get_url_key() {
+        let file = ObjectStoreUrl::parse("file://").unwrap();
+        let key = get_url_key(&file.url);
+        assert_eq!(key.as_str(), "file://");
+
+        let url = ObjectStoreUrl::parse("s3://bucket").unwrap();
+        let key = get_url_key(&url.url);
+        assert_eq!(key.as_str(), "s3://bucket");
+
+        let url = 
ObjectStoreUrl::parse("s3://username:password@host:123").unwrap();
+        let key = get_url_key(&url.url);
+        assert_eq!(key.as_str(), "s3://host:123");
+    }
 }
diff --git a/datafusion/execution/src/runtime_env.rs 
b/datafusion/execution/src/runtime_env.rs
index 3163e0e03e..67736edf68 100644
--- a/datafusion/execution/src/runtime_env.rs
+++ b/datafusion/execution/src/runtime_env.rs
@@ -21,7 +21,7 @@
 use crate::{
     disk_manager::{DiskManager, DiskManagerConfig},
     memory_pool::{GreedyMemoryPool, MemoryPool, UnboundedMemoryPool},
-    object_store::ObjectStoreRegistry,
+    object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
 };
 
 use datafusion_common::{DataFusionError, Result};
@@ -39,7 +39,7 @@ pub struct RuntimeEnv {
     /// Manage temporary files during query execution
     pub disk_manager: Arc<DiskManager>,
     /// Object Store Registry
-    pub object_store_registry: Arc<ObjectStoreRegistry>,
+    pub object_store_registry: Arc<dyn ObjectStoreRegistry>,
 }
 
 impl Debug for RuntimeEnv {
@@ -67,10 +67,9 @@ impl RuntimeEnv {
         })
     }
 
-    /// Registers a custom `ObjectStore` to be used when accessing a
-    /// specific scheme and host. This allows DataFusion to create
-    /// external tables from urls that do not have built in support
-    /// such as `hdfs://...`.
+    /// Registers a custom `ObjectStore` to be used with a specific url.
+    /// This allows DataFusion to create external tables from urls that do not 
have
+    /// built in support such as `hdfs://namenode:port/...`.
     ///
     /// Returns the [`ObjectStore`] previously registered for this
     /// scheme, if any.
@@ -78,20 +77,18 @@ impl RuntimeEnv {
     /// See [`ObjectStoreRegistry`] for more details
     pub fn register_object_store(
         &self,
-        scheme: impl AsRef<str>,
-        host: impl AsRef<str>,
+        url: &Url,
         object_store: Arc<dyn ObjectStore>,
     ) -> Option<Arc<dyn ObjectStore>> {
-        self.object_store_registry
-            .register_store(scheme, host, object_store)
+        self.object_store_registry.register_store(url, object_store)
     }
 
     /// Retrieves a `ObjectStore` instance for a url by consulting the
-    /// registery. See [`ObjectStoreRegistry::get_by_url`] for more
+    /// registry. See [`ObjectStoreRegistry::get_store`] for more
     /// details.
     pub fn object_store(&self, url: impl AsRef<Url>) -> Result<Arc<dyn 
ObjectStore>> {
         self.object_store_registry
-            .get_by_url(url)
+            .get_store(url.as_ref())
             .map_err(DataFusionError::from)
     }
 }
@@ -102,7 +99,7 @@ impl Default for RuntimeEnv {
     }
 }
 
-#[derive(Clone, Default)]
+#[derive(Clone)]
 /// Execution runtime configuration
 pub struct RuntimeConfig {
     /// DiskManager to manage temporary disk file usage
@@ -112,13 +109,23 @@ pub struct RuntimeConfig {
     /// Defaults to using an [`UnboundedMemoryPool`] if `None`
     pub memory_pool: Option<Arc<dyn MemoryPool>>,
     /// ObjectStoreRegistry to get object store based on url
-    pub object_store_registry: Arc<ObjectStoreRegistry>,
+    pub object_store_registry: Arc<dyn ObjectStoreRegistry>,
+}
+
+impl Default for RuntimeConfig {
+    fn default() -> Self {
+        Self::new()
+    }
 }
 
 impl RuntimeConfig {
     /// New with default values
     pub fn new() -> Self {
-        Default::default()
+        Self {
+            disk_manager: Default::default(),
+            memory_pool: Default::default(),
+            object_store_registry: 
Arc::new(DefaultObjectStoreRegistry::default()),
+        }
     }
 
     /// Customize disk manager
@@ -136,7 +143,7 @@ impl RuntimeConfig {
     /// Customize object store registry
     pub fn with_object_store_registry(
         mut self,
-        object_store_registry: Arc<ObjectStoreRegistry>,
+        object_store_registry: Arc<dyn ObjectStoreRegistry>,
     ) -> Self {
         self.object_store_registry = object_store_registry;
         self

Reply via email to