tustvold commented on code in PR #5543:
URL: https://github.com/apache/arrow-datafusion/pull/5543#discussion_r1143247554
##########
datafusion/execution/src/object_store.rs:
##########
@@ -156,78 +158,64 @@ impl std::fmt::Debug for ObjectStoreRegistry {
}
}
-impl Default for ObjectStoreRegistry {
+impl Default for DefaultObjectStoreRegistry {
fn default() -> Self {
Self::new()
}
}
-impl ObjectStoreRegistry {
- /// Create an [`ObjectStoreRegistry`] with no [`ObjectStoreProvider`].
- ///
- /// This will register [`LocalFileSystem`] to handle `file://` paths,
further stores
- /// will need to be explicitly registered with calls to
[`ObjectStoreRegistry::register_store`]
+impl DefaultObjectStoreRegistry {
+ /// This will register [`LocalFileSystem`] to handle `file://` paths
pub fn new() -> Self {
- ObjectStoreRegistry::new_with_provider(None)
- }
-
- /// Create an [`ObjectStoreRegistry`] with the provided
[`ObjectStoreProvider`]
- ///
- /// This will register [`LocalFileSystem`] to handle `file://` paths,
further stores
- /// may be explicity registered with calls to
[`ObjectStoreRegistry::register_store`] or
- /// created lazily, on-demand by the provided [`ObjectStoreProvider`]
- pub fn new_with_provider(provider: Option<Arc<dyn ObjectStoreProvider>>)
-> Self {
let object_stores: DashMap<String, Arc<dyn ObjectStore>> =
DashMap::new();
- object_stores.insert("file://".to_string(),
Arc::new(LocalFileSystem::new()));
- Self {
- object_stores,
- provider,
- }
+ let url = Url::parse("file://").unwrap();
+ let key = get_url_key(&url);
+ object_stores.insert(key, Arc::new(LocalFileSystem::new()));
+ Self { object_stores }
}
+}
- /// Adds a new store to this registry.
- ///
- /// If a store with the same schema and host existed before, it is
replaced and returned
- pub fn register_store(
+impl ObjectStoreRegistry for DefaultObjectStoreRegistry {
+ fn register_store(
&self,
- scheme: impl AsRef<str>,
- host: impl AsRef<str>,
+ url: &Url,
store: Arc<dyn ObjectStore>,
) -> Option<Arc<dyn ObjectStore>> {
- let s = format!("{}://{}", scheme.as_ref(), host.as_ref());
+ let s = get_url_key(url);
self.object_stores.insert(s, store)
}
- /// Get a suitable store for the provided URL. For example:
- ///
- /// - URL with scheme `file:///` or no schema will return the default
LocalFS store
- /// - URL with scheme `s3://bucket/` will return the S3 store if it's
registered
- /// - URL with scheme `hdfs://hostname:port/` will return the hdfs store
if it's registered
- ///
- pub fn get_by_url(&self, url: impl AsRef<Url>) -> Result<Arc<dyn
ObjectStore>> {
- let url = url.as_ref();
- // First check whether can get object store from registry
- let s = &url[url::Position::BeforeScheme..url::Position::BeforePath];
- let store = self.object_stores.get(s).map(|o| o.value().clone());
-
- match store {
- Some(store) => Ok(store),
- None => match &self.provider {
- Some(provider) => {
- let store = provider.get_by_url(url)?;
- let key =
-
&url[url::Position::BeforeScheme..url::Position::BeforePath];
- self.object_stores.insert(key.to_owned(), store.clone());
- Ok(store)
- }
- None => Err(DataFusionError::Internal(format!(
+ /// The [`DefaultObjectStoreRegistry`] will only depend on the inner
object store cache
+ /// to decide whether it's able to find an [`ObjectStore`] for a url. No
ad-hoc discovery
+ /// and lazy registration will be executed.
+ fn get_or_lazy_register_store(&self, url: &Url) -> Result<Arc<dyn
ObjectStore>> {
+ let s = get_url_key(url);
+ self.object_stores
+ .get(&s)
+ .map(|o| o.value().clone())
+ .ok_or_else(|| {
+ DataFusionError::Internal(format!(
"No suitable object store found for {url}"
- ))),
- },
- }
+ ))
+ })
}
}
+/// Get the key of a url for object store registration.
+/// The credential info will be removed
+fn get_url_key(url: &Url) -> String {
+ let port_info = url
+ .port()
+ .map(|port| format!(":{port}"))
+ .unwrap_or(String::new());
+ format!(
+ "{}://{}{}/",
+ &url[url::Position::BeforeScheme..url::Position::AfterScheme],
+ &url[url::Position::BeforeHost..url::Position::AfterHost],
+ port_info
+ )
Review Comment:
```suggestion
format!(
"{}://{}{}",
url->scheme(),
&url[url::Position::BeforeHost..url::Position::AfterPort],
)
```
Note: this also removes the trailing `/` which I _think_ isn't correct
##########
datafusion/execution/src/object_store.rs:
##########
@@ -132,18 +115,37 @@ pub trait ObjectStoreProvider: Send + Sync + 'static {
/// buckets using [`ObjectStoreRegistry::register_store`]
///
/// 2. Systems relying on ad-hoc discovery, without corresponding DDL, can
create [`ObjectStore`]
-/// lazily, on-demand using [`ObjectStoreProvider`]
+/// lazily
///
/// [`ListingTableUrl`]: crate::datasource::listing::ListingTableUrl
-pub struct ObjectStoreRegistry {
+pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static {
+ /// If a store with the same key existed before, it is replaced and
returned
+ fn register_store(
+ &self,
+ url: &Url,
+ store: Arc<dyn ObjectStore>,
+ ) -> Option<Arc<dyn ObjectStore>>;
+
+ /// Get a suitable store for the provided URL. For example:
+ ///
+ /// - URL with scheme `file:///` or no schema will return the default
LocalFS store
+ /// - URL with scheme `s3://bucket/` will return the S3 store
+ /// - URL with scheme `hdfs://hostname:port/` will return the hdfs store
+ ///
+ /// If no [`ObjectStore`] found for the `url`, ad-hoc discovery may be
executed depending on
+ /// the `url`. An [`ObjectStore`] may be lazily created and registered.
Review Comment:
```suggestion
/// If no [`ObjectStore`] found for the `url`, ad-hoc discovery may be
executed depending on
/// the `url` and [`ObjectStoreRegistry`] implementation
```
##########
datafusion/execution/src/object_store.rs:
##########
@@ -156,78 +158,64 @@ impl std::fmt::Debug for ObjectStoreRegistry {
}
}
-impl Default for ObjectStoreRegistry {
+impl Default for DefaultObjectStoreRegistry {
fn default() -> Self {
Self::new()
}
}
-impl ObjectStoreRegistry {
- /// Create an [`ObjectStoreRegistry`] with no [`ObjectStoreProvider`].
- ///
- /// This will register [`LocalFileSystem`] to handle `file://` paths,
further stores
- /// will need to be explicitly registered with calls to
[`ObjectStoreRegistry::register_store`]
+impl DefaultObjectStoreRegistry {
+ /// This will register [`LocalFileSystem`] to handle `file://` paths
pub fn new() -> Self {
- ObjectStoreRegistry::new_with_provider(None)
- }
-
- /// Create an [`ObjectStoreRegistry`] with the provided
[`ObjectStoreProvider`]
- ///
- /// This will register [`LocalFileSystem`] to handle `file://` paths,
further stores
- /// may be explicity registered with calls to
[`ObjectStoreRegistry::register_store`] or
- /// created lazily, on-demand by the provided [`ObjectStoreProvider`]
- pub fn new_with_provider(provider: Option<Arc<dyn ObjectStoreProvider>>)
-> Self {
let object_stores: DashMap<String, Arc<dyn ObjectStore>> =
DashMap::new();
- object_stores.insert("file://".to_string(),
Arc::new(LocalFileSystem::new()));
- Self {
- object_stores,
- provider,
- }
+ let url = Url::parse("file://").unwrap();
+ let key = get_url_key(&url);
+ object_stores.insert(key, Arc::new(LocalFileSystem::new()));
Review Comment:
```suggestion
object_stores.insert("file://".to_string(),
Arc::new(LocalFileSystem::new()));
```
See below comment about removing trailing `/`
##########
datafusion/execution/src/object_store.rs:
##########
@@ -132,18 +115,37 @@ pub trait ObjectStoreProvider: Send + Sync + 'static {
/// buckets using [`ObjectStoreRegistry::register_store`]
///
/// 2. Systems relying on ad-hoc discovery, without corresponding DDL, can
create [`ObjectStore`]
-/// lazily, on-demand using [`ObjectStoreProvider`]
+/// lazily
Review Comment:
```suggestion
/// lazily by providing a custom implementation of [`ObjectStoreRegistry`]
```
##########
datafusion-cli/src/object_storage.rs:
##########
@@ -43,28 +46,52 @@ impl FromStr for ObjectStoreScheme {
}
}
-#[derive(Debug)]
-pub struct DatafusionCliObjectStoreProvider {}
+#[derive(Debug, Default)]
Review Comment:
```suggestion
/// An [`ObjectStoreRegistry`] that can automatically create S3 and GCS
stores for a given URL
#[derive(Debug, Default)]
```
##########
datafusion/execution/src/object_store.rs:
##########
@@ -132,18 +115,37 @@ pub trait ObjectStoreProvider: Send + Sync + 'static {
/// buckets using [`ObjectStoreRegistry::register_store`]
///
/// 2. Systems relying on ad-hoc discovery, without corresponding DDL, can
create [`ObjectStore`]
-/// lazily, on-demand using [`ObjectStoreProvider`]
+/// lazily
///
/// [`ListingTableUrl`]: crate::datasource::listing::ListingTableUrl
-pub struct ObjectStoreRegistry {
+pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static {
+ /// If a store with the same key existed before, it is replaced and
returned
+ fn register_store(
+ &self,
+ url: &Url,
+ store: Arc<dyn ObjectStore>,
+ ) -> Option<Arc<dyn ObjectStore>>;
+
+ /// Get a suitable store for the provided URL. For example:
+ ///
+ /// - URL with scheme `file:///` or no schema will return the default
LocalFS store
+ /// - URL with scheme `s3://bucket/` will return the S3 store
+ /// - URL with scheme `hdfs://hostname:port/` will return the hdfs store
Review Comment:
```suggestion
/// Get a suitable store for the provided URL
```
These are outdated now
##########
datafusion/execution/src/object_store.rs:
##########
@@ -156,78 +158,64 @@ impl std::fmt::Debug for ObjectStoreRegistry {
}
}
-impl Default for ObjectStoreRegistry {
+impl Default for DefaultObjectStoreRegistry {
fn default() -> Self {
Self::new()
}
}
-impl ObjectStoreRegistry {
- /// Create an [`ObjectStoreRegistry`] with no [`ObjectStoreProvider`].
- ///
- /// This will register [`LocalFileSystem`] to handle `file://` paths,
further stores
- /// will need to be explicitly registered with calls to
[`ObjectStoreRegistry::register_store`]
+impl DefaultObjectStoreRegistry {
+ /// This will register [`LocalFileSystem`] to handle `file://` paths
pub fn new() -> Self {
- ObjectStoreRegistry::new_with_provider(None)
- }
-
- /// Create an [`ObjectStoreRegistry`] with the provided
[`ObjectStoreProvider`]
- ///
- /// This will register [`LocalFileSystem`] to handle `file://` paths,
further stores
- /// may be explicity registered with calls to
[`ObjectStoreRegistry::register_store`] or
- /// created lazily, on-demand by the provided [`ObjectStoreProvider`]
- pub fn new_with_provider(provider: Option<Arc<dyn ObjectStoreProvider>>)
-> Self {
let object_stores: DashMap<String, Arc<dyn ObjectStore>> =
DashMap::new();
- object_stores.insert("file://".to_string(),
Arc::new(LocalFileSystem::new()));
- Self {
- object_stores,
- provider,
- }
+ let url = Url::parse("file://").unwrap();
+ let key = get_url_key(&url);
+ object_stores.insert(key, Arc::new(LocalFileSystem::new()));
+ Self { object_stores }
}
+}
- /// Adds a new store to this registry.
- ///
- /// If a store with the same schema and host existed before, it is
replaced and returned
- pub fn register_store(
+impl ObjectStoreRegistry for DefaultObjectStoreRegistry {
+ fn register_store(
&self,
- scheme: impl AsRef<str>,
- host: impl AsRef<str>,
+ url: &Url,
store: Arc<dyn ObjectStore>,
) -> Option<Arc<dyn ObjectStore>> {
- let s = format!("{}://{}", scheme.as_ref(), host.as_ref());
+ let s = get_url_key(url);
self.object_stores.insert(s, store)
}
- /// Get a suitable store for the provided URL. For example:
- ///
- /// - URL with scheme `file:///` or no schema will return the default
LocalFS store
- /// - URL with scheme `s3://bucket/` will return the S3 store if it's
registered
- /// - URL with scheme `hdfs://hostname:port/` will return the hdfs store
if it's registered
- ///
- pub fn get_by_url(&self, url: impl AsRef<Url>) -> Result<Arc<dyn
ObjectStore>> {
- let url = url.as_ref();
- // First check whether can get object store from registry
- let s = &url[url::Position::BeforeScheme..url::Position::BeforePath];
- let store = self.object_stores.get(s).map(|o| o.value().clone());
-
- match store {
- Some(store) => Ok(store),
- None => match &self.provider {
- Some(provider) => {
- let store = provider.get_by_url(url)?;
- let key =
-
&url[url::Position::BeforeScheme..url::Position::BeforePath];
- self.object_stores.insert(key.to_owned(), store.clone());
- Ok(store)
- }
- None => Err(DataFusionError::Internal(format!(
+ /// The [`DefaultObjectStoreRegistry`] will only depend on the inner
object store cache
+ /// to decide whether it's able to find an [`ObjectStore`] for a url. No
ad-hoc discovery
+ /// and lazy registration will be executed.
Review Comment:
```suggestion
```
These docs aren't rendered by docs.rs, I moved this onto the type definition
and reworded it slightly
##########
datafusion-cli/src/object_storage.rs:
##########
@@ -43,28 +46,52 @@ impl FromStr for ObjectStoreScheme {
}
}
-#[derive(Debug)]
-pub struct DatafusionCliObjectStoreProvider {}
+#[derive(Debug, Default)]
+pub struct DatafusionCliObjectStoreRegistry {
+ inner: DefaultObjectStoreRegistry,
+}
+
+impl DatafusionCliObjectStoreRegistry {
+ pub fn new() -> Self {
+ Default::default()
+ }
+}
+
+impl ObjectStoreRegistry for DatafusionCliObjectStoreRegistry {
+ fn register_store(
+ &self,
+ url: &Url,
+ store: Arc<dyn ObjectStore>,
+ ) -> Option<Arc<dyn ObjectStore>> {
+ self.inner.register_store(url, store)
+ }
+
+ fn get_or_lazy_register_store(&self, url: &Url) -> Result<Arc<dyn
ObjectStore>> {
Review Comment:
```suggestion
fn get_store(&self, url: &Url) -> Result<Arc<dyn ObjectStore>> {
```
The lazy creation is an implementation detail of certain
`ObjectStoreRegistry`, I'm not sure it needs to be encoded in the name
##########
datafusion/execution/src/object_store.rs:
##########
@@ -132,18 +115,37 @@ pub trait ObjectStoreProvider: Send + Sync + 'static {
/// buckets using [`ObjectStoreRegistry::register_store`]
///
/// 2. Systems relying on ad-hoc discovery, without corresponding DDL, can
create [`ObjectStore`]
-/// lazily, on-demand using [`ObjectStoreProvider`]
+/// lazily
///
/// [`ListingTableUrl`]: crate::datasource::listing::ListingTableUrl
-pub struct ObjectStoreRegistry {
+pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static {
+ /// If a store with the same key existed before, it is replaced and
returned
+ fn register_store(
+ &self,
+ url: &Url,
+ store: Arc<dyn ObjectStore>,
+ ) -> Option<Arc<dyn ObjectStore>>;
+
+ /// Get a suitable store for the provided URL. For example:
+ ///
+ /// - URL with scheme `file:///` or no schema will return the default
LocalFS store
+ /// - URL with scheme `s3://bucket/` will return the S3 store
+ /// - URL with scheme `hdfs://hostname:port/` will return the hdfs store
+ ///
+ /// If no [`ObjectStore`] found for the `url`, ad-hoc discovery may be
executed depending on
+ /// the `url`. An [`ObjectStore`] may be lazily created and registered.
+ fn get_or_lazy_register_store(&self, url: &Url) -> Result<Arc<dyn
ObjectStore>>;
+}
+
+/// The default [`ObjectStoreRegistry`]
+pub struct DefaultObjectStoreRegistry {
Review Comment:
```suggestion
///
/// Stores are registered based on the scheme, host and port of the provided
URL
/// with a [`LocalFileSystem::new`] automatically registered for `file://`
///
/// For example:
///
/// - `file:///my_path` will return the default LocalFS store
/// - `s3://bucket/path` will return a store registered with `s3://bucket`
if any
/// - `hdfs://host:port/path` will return a store registered with
`hdfs://host:port` if any
pub struct DefaultObjectStoreRegistry {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]