This is an automated email from the ASF dual-hosted git repository.
nju_yaho pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 3a8b5dae14 Change ObjectStoreRegistry from struct to trait to provide
polymorphism (#5543)
3a8b5dae14 is described below
commit 3a8b5dae14aac21712281d1ff4e9737cbfd35813
Author: yahoNanJing <[email protected]>
AuthorDate: Wed Mar 22 11:39:08 2023 +0800
Change ObjectStoreRegistry from struct to trait to provide polymorphism
(#5543)
* Introduce ObjectStoreManager trait for the ObjectStoreRegistry to provide
polymorphism for get_by_url
* Change ObjectStoreRegistry from struct to trait
* Fix for review comments
* Minor fix
* Use url instead of &str as the parameter for register_store
* Remove register_with_scheme_and_host
* Minor fix
* Fix for review comments
---------
Co-authored-by: yangzhong <[email protected]>
---
datafusion-cli/src/main.rs | 7 +-
datafusion-cli/src/object_storage.rs | 90 ++++++++----
datafusion-examples/Cargo.toml | 1 +
datafusion-examples/examples/query-aws-s3.rs | 5 +-
datafusion/core/src/datasource/listing/mod.rs | 39 ++---
.../core/src/physical_plan/file_format/avro.rs | 4 +-
.../core/src/physical_plan/file_format/csv.rs | 8 +-
.../core/src/physical_plan/file_format/json.rs | 4 +-
datafusion/core/src/test/object_store.rs | 4 +-
datafusion/core/tests/path_partition.rs | 9 +-
datafusion/execution/src/object_store.rs | 159 +++++++++++----------
datafusion/execution/src/runtime_env.rs | 39 ++---
12 files changed, 210 insertions(+), 159 deletions(-)
diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs
index 6d7b29837c..8061f4d516 100644
--- a/datafusion-cli/src/main.rs
+++ b/datafusion-cli/src/main.rs
@@ -16,13 +16,12 @@
// under the License.
use clap::Parser;
-use datafusion::datasource::object_store::ObjectStoreRegistry;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionConfig;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::prelude::SessionContext;
use datafusion_cli::catalog::DynamicFileCatalog;
-use datafusion_cli::object_storage::DatafusionCliObjectStoreProvider;
+use datafusion_cli::object_storage::DatafusionCliObjectStoreRegistry;
use datafusion_cli::{
exec, print_format::PrintFormat, print_options::PrintOptions,
DATAFUSION_CLI_VERSION,
};
@@ -149,9 +148,7 @@ pub async fn main() -> Result<()> {
}
fn create_runtime_env() -> Result<RuntimeEnv> {
- let object_store_provider = DatafusionCliObjectStoreProvider {};
- let object_store_registry =
-
ObjectStoreRegistry::new_with_provider(Some(Arc::new(object_store_provider)));
+ let object_store_registry = DatafusionCliObjectStoreRegistry::new();
let rn_config =
RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry));
RuntimeEnv::new(rn_config)
diff --git a/datafusion-cli/src/object_storage.rs
b/datafusion-cli/src/object_storage.rs
index 56401c42f5..46a42c927b 100644
--- a/datafusion-cli/src/object_storage.rs
+++ b/datafusion-cli/src/object_storage.rs
@@ -18,8 +18,11 @@
use datafusion::error::Result;
use std::{env, str::FromStr, sync::Arc};
-use datafusion::{datasource::object_store::ObjectStoreProvider,
error::DataFusionError};
-use object_store::{aws::AmazonS3Builder, gcp::GoogleCloudStorageBuilder};
+use datafusion::datasource::object_store::{
+ DefaultObjectStoreRegistry, ObjectStoreRegistry,
+};
+use datafusion::error::DataFusionError;
+use object_store::{aws::AmazonS3Builder, gcp::GoogleCloudStorageBuilder,
ObjectStore};
use url::Url;
#[derive(Debug, PartialEq, Eq, clap::ArgEnum, Clone)]
@@ -43,20 +46,45 @@ impl FromStr for ObjectStoreScheme {
}
}
-#[derive(Debug)]
-pub struct DatafusionCliObjectStoreProvider {}
+/// An [`ObjectStoreRegistry`] that can automatically create S3 and GCS stores
for a given URL
+#[derive(Debug, Default)]
+pub struct DatafusionCliObjectStoreRegistry {
+ inner: DefaultObjectStoreRegistry,
+}
+
+impl DatafusionCliObjectStoreRegistry {
+ pub fn new() -> Self {
+ Default::default()
+ }
+}
+
+impl ObjectStoreRegistry for DatafusionCliObjectStoreRegistry {
+ fn register_store(
+ &self,
+ url: &Url,
+ store: Arc<dyn ObjectStore>,
+ ) -> Option<Arc<dyn ObjectStore>> {
+ self.inner.register_store(url, store)
+ }
+
+ fn get_store(&self, url: &Url) -> Result<Arc<dyn ObjectStore>> {
+ self.inner.get_store(url).or_else(|_| {
+ let store =
+ ObjectStoreScheme::from_str(url.scheme()).map(
+ |scheme| match scheme {
+ ObjectStoreScheme::S3 => build_s3_object_store(url),
+ ObjectStoreScheme::GCS => build_gcs_object_store(url),
+ },
+ )??;
+
+ self.inner.register_store(url, store.clone());
-/// ObjectStoreProvider for S3 and GCS
-impl ObjectStoreProvider for DatafusionCliObjectStoreProvider {
- fn get_by_url(&self, url: &Url) -> Result<Arc<dyn
object_store::ObjectStore>> {
- ObjectStoreScheme::from_str(url.scheme()).map(|scheme| match scheme {
- ObjectStoreScheme::S3 => build_s3_object_store(url),
- ObjectStoreScheme::GCS => build_gcs_object_store(url),
- })?
+ Ok(store)
+ })
}
}
-fn build_s3_object_store(url: &Url) -> Result<Arc<dyn
object_store::ObjectStore>> {
+fn build_s3_object_store(url: &Url) -> Result<Arc<dyn ObjectStore>> {
let host = get_host_name(url)?;
match AmazonS3Builder::from_env().with_bucket_name(host).build() {
Ok(s3) => Ok(Arc::new(s3)),
@@ -64,7 +92,7 @@ fn build_s3_object_store(url: &Url) -> Result<Arc<dyn
object_store::ObjectStore>
}
}
-fn build_gcs_object_store(url: &Url) -> Result<Arc<dyn
object_store::ObjectStore>> {
+fn build_gcs_object_store(url: &Url) -> Result<Arc<dyn ObjectStore>> {
let host = get_host_name(url)?;
let mut builder = GoogleCloudStorageBuilder::new().with_bucket_name(host);
@@ -90,17 +118,17 @@ fn get_host_name(url: &Url) -> Result<&str> {
mod tests {
use std::{env, str::FromStr};
- use datafusion::datasource::object_store::ObjectStoreProvider;
+ use datafusion::datasource::object_store::ObjectStoreRegistry;
use url::Url;
- use super::DatafusionCliObjectStoreProvider;
+ use super::DatafusionCliObjectStoreRegistry;
#[test]
fn s3_provider_no_host() {
let no_host_url = "s3:///";
- let provider = DatafusionCliObjectStoreProvider {};
- let err = provider
- .get_by_url(&Url::from_str(no_host_url).unwrap())
+ let registry = DatafusionCliObjectStoreRegistry::new();
+ let err = registry
+ .get_store(&Url::from_str(no_host_url).unwrap())
.unwrap_err();
assert!(err
.to_string()
@@ -110,9 +138,9 @@ mod tests {
#[test]
fn gs_provider_no_host() {
let no_host_url = "gs:///";
- let provider = DatafusionCliObjectStoreProvider {};
- let err = provider
- .get_by_url(&Url::from_str(no_host_url).unwrap())
+ let registry = DatafusionCliObjectStoreRegistry::new();
+ let err = registry
+ .get_store(&Url::from_str(no_host_url).unwrap())
.unwrap_err();
assert!(err
.to_string()
@@ -122,9 +150,9 @@ mod tests {
#[test]
fn gcs_provider_no_host() {
let no_host_url = "gcs:///";
- let provider = DatafusionCliObjectStoreProvider {};
- let err = provider
- .get_by_url(&Url::from_str(no_host_url).unwrap())
+ let registry = DatafusionCliObjectStoreRegistry::new();
+ let err = registry
+ .get_store(&Url::from_str(no_host_url).unwrap())
.unwrap_err();
assert!(err
.to_string()
@@ -134,9 +162,9 @@ mod tests {
#[test]
fn unknown_object_store_type() {
let unknown = "unknown://bucket_name/path";
- let provider = DatafusionCliObjectStoreProvider {};
- let err = provider
- .get_by_url(&Url::from_str(unknown).unwrap())
+ let registry = DatafusionCliObjectStoreRegistry::new();
+ let err = registry
+ .get_store(&Url::from_str(unknown).unwrap())
.unwrap_err();
assert!(err
.to_string()
@@ -146,15 +174,15 @@ mod tests {
#[test]
fn s3_region_validation() {
let s3 = "s3://bucket_name/path";
- let provider = DatafusionCliObjectStoreProvider {};
- let err = provider
- .get_by_url(&Url::from_str(s3).unwrap())
+ let registry = DatafusionCliObjectStoreRegistry::new();
+ let err = registry
+ .get_store(&Url::from_str(s3).unwrap())
.unwrap_err();
assert!(err.to_string().contains("Generic S3 error: Missing region"));
env::set_var("AWS_REGION", "us-east-1");
let url = Url::from_str(s3).expect("Unable to parse s3 url");
- let res = provider.get_by_url(&url);
+ let res = registry.get_store(&url);
let msg = match res {
Err(e) => format!("{e}"),
Ok(_) => "".to_string(),
diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml
index 613d3e934a..6e5c62ce82 100644
--- a/datafusion-examples/Cargo.toml
+++ b/datafusion-examples/Cargo.toml
@@ -57,4 +57,5 @@ serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.82"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread",
"sync", "parking_lot"] }
tonic = "0.8"
+url = "2.2"
uuid = "1.2"
diff --git a/datafusion-examples/examples/query-aws-s3.rs
b/datafusion-examples/examples/query-aws-s3.rs
index 3788167312..cbb6486b4e 100644
--- a/datafusion-examples/examples/query-aws-s3.rs
+++ b/datafusion-examples/examples/query-aws-s3.rs
@@ -20,6 +20,7 @@ use datafusion::prelude::*;
use object_store::aws::AmazonS3Builder;
use std::env;
use std::sync::Arc;
+use url::Url;
/// This example demonstrates querying data in an S3 bucket.
///
@@ -45,8 +46,10 @@ async fn main() -> Result<()> {
.with_secret_access_key(env::var("AWS_SECRET_ACCESS_KEY").unwrap())
.build()?;
+ let path = format!("s3://{bucket_name}");
+ let s3_url = Url::parse(&path).unwrap();
ctx.runtime_env()
- .register_object_store("s3", bucket_name, Arc::new(s3));
+ .register_object_store(&s3_url, Arc::new(s3));
// cannot query the parquet files from this bucket because the path
contains a whitespace
// and we don't support that yet
diff --git a/datafusion/core/src/datasource/listing/mod.rs
b/datafusion/core/src/datasource/listing/mod.rs
index 1e6b40a348..c7a1761151 100644
--- a/datafusion/core/src/datasource/listing/mod.rs
+++ b/datafusion/core/src/datasource/listing/mod.rs
@@ -115,10 +115,13 @@ impl From<ObjectMeta> for PartitionedFile {
#[cfg(test)]
mod tests {
- use datafusion_execution::object_store::ObjectStoreRegistry;
+ use crate::datasource::listing::ListingTableUrl;
+ use datafusion_execution::object_store::{
+ DefaultObjectStoreRegistry, ObjectStoreRegistry,
+ };
use object_store::local::LocalFileSystem;
-
- use super::*;
+ use std::sync::Arc;
+ use url::Url;
#[test]
fn test_object_store_listing_url() {
@@ -132,32 +135,34 @@ mod tests {
}
#[test]
- fn test_get_by_url_hdfs() {
- let sut = ObjectStoreRegistry::default();
- sut.register_store("hdfs", "localhost:8020",
Arc::new(LocalFileSystem::new()));
+ fn test_get_store_hdfs() {
+ let sut = DefaultObjectStoreRegistry::default();
+ let url = Url::parse("hdfs://localhost:8020").unwrap();
+ sut.register_store(&url, Arc::new(LocalFileSystem::new()));
let url = ListingTableUrl::parse("hdfs://localhost:8020/key").unwrap();
- sut.get_by_url(&url).unwrap();
+ sut.get_store(url.as_ref()).unwrap();
}
#[test]
- fn test_get_by_url_s3() {
- let sut = ObjectStoreRegistry::default();
- sut.register_store("s3", "bucket", Arc::new(LocalFileSystem::new()));
+ fn test_get_store_s3() {
+ let sut = DefaultObjectStoreRegistry::default();
+ let url = Url::parse("s3://bucket/key").unwrap();
+ sut.register_store(&url, Arc::new(LocalFileSystem::new()));
let url = ListingTableUrl::parse("s3://bucket/key").unwrap();
- sut.get_by_url(&url).unwrap();
+ sut.get_store(url.as_ref()).unwrap();
}
#[test]
- fn test_get_by_url_file() {
- let sut = ObjectStoreRegistry::default();
+ fn test_get_store_file() {
+ let sut = DefaultObjectStoreRegistry::default();
let url = ListingTableUrl::parse("file:///bucket/key").unwrap();
- sut.get_by_url(&url).unwrap();
+ sut.get_store(url.as_ref()).unwrap();
}
#[test]
- fn test_get_by_url_local() {
- let sut = ObjectStoreRegistry::default();
+ fn test_get_store_local() {
+ let sut = DefaultObjectStoreRegistry::default();
let url = ListingTableUrl::parse("../").unwrap();
- sut.get_by_url(&url).unwrap();
+ sut.get_store(url.as_ref()).unwrap();
}
}
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs
b/datafusion/core/src/physical_plan/file_format/avro.rs
index e4fc570feb..ed27dfac03 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -221,6 +221,7 @@ mod tests {
use object_store::local::LocalFileSystem;
use object_store::ObjectStore;
use rstest::*;
+ use url::Url;
use super::*;
@@ -245,9 +246,10 @@ mod tests {
let session_ctx = SessionContext::new();
let state = session_ctx.state();
+ let url = Url::parse("file://").unwrap();
state
.runtime_env()
- .register_object_store("file", "", store.clone());
+ .register_object_store(&url, store.clone());
let testdata = crate::test_util::arrow_test_data();
let filename = format!("{testdata}/avro/alltypes_plain.avro");
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs
b/datafusion/core/src/physical_plan/file_format/csv.rs
index 3fc7df4f10..d9075c84ad 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -337,6 +337,7 @@ mod tests {
use std::fs::File;
use std::io::Write;
use tempfile::TempDir;
+ use url::Url;
#[rstest(
file_compression_type,
@@ -656,8 +657,8 @@ mod tests {
store: Arc<dyn ObjectStore>,
) {
let ctx = SessionContext::new();
- ctx.runtime_env()
- .register_object_store("file", "", store.clone());
+ let url = Url::parse("file://").unwrap();
+ ctx.runtime_env().register_object_store(&url, store.clone());
let task_ctx = ctx.task_ctx();
@@ -717,9 +718,10 @@ mod tests {
let path = object_store::path::Path::from("a.csv");
store.put(&path, data).await.unwrap();
+ let url = Url::parse("memory://").unwrap();
session_ctx
.runtime_env()
- .register_object_store("memory", "", Arc::new(store));
+ .register_object_store(&url, Arc::new(store));
let df = session_ctx
.read_csv("memory:///", CsvReadOptions::new())
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs
b/datafusion/core/src/physical_plan/file_format/json.rs
index 146227335c..ebbae74178 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -331,8 +331,8 @@ mod tests {
store: Arc<dyn ObjectStore>,
) {
let ctx = SessionContext::new();
- ctx.runtime_env()
- .register_object_store("file", "", store.clone());
+ let url = Url::parse("file://").unwrap();
+ ctx.runtime_env().register_object_store(&url, store.clone());
let filename = "1.json";
let file_groups = partitioned_file_groups(
TEST_DATA_BASE,
diff --git a/datafusion/core/src/test/object_store.rs
b/datafusion/core/src/test/object_store.rs
index e61653e882..3e2d1fdae0 100644
--- a/datafusion/core/src/test/object_store.rs
+++ b/datafusion/core/src/test/object_store.rs
@@ -19,11 +19,13 @@ use crate::prelude::SessionContext;
use futures::FutureExt;
use object_store::{memory::InMemory, path::Path, ObjectMeta, ObjectStore};
use std::sync::Arc;
+use url::Url;
/// Returns a test object store with the provided `ctx`
pub fn register_test_store(ctx: &SessionContext, files: &[(&str, u64)]) {
+ let url = Url::parse("test://").unwrap();
ctx.runtime_env()
- .register_object_store("test", "", make_test_store(files));
+ .register_object_store(&url, make_test_store(files));
}
/// Create a test object store with the provided files
diff --git a/datafusion/core/tests/path_partition.rs
b/datafusion/core/tests/path_partition.rs
index 6c7f1431af..b93ad02aa2 100644
--- a/datafusion/core/tests/path_partition.rs
+++ b/datafusion/core/tests/path_partition.rs
@@ -45,6 +45,7 @@ use object_store::{
path::Path, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
};
use tokio::io::AsyncWrite;
+use url::Url;
#[tokio::test]
async fn parquet_distinct_partition_col() -> Result<()> {
@@ -523,9 +524,9 @@ fn register_partitioned_aggregate_csv(
let testdata = arrow_test_data();
let csv_file_path = format!("{testdata}/csv/aggregate_test_100.csv");
let file_schema = test_util::aggr_test_schema();
+ let url = Url::parse("mirror://").unwrap();
ctx.runtime_env().register_object_store(
- "mirror",
- "",
+ &url,
MirroringObjectStore::new_arc(csv_file_path, store_paths),
);
@@ -556,9 +557,9 @@ async fn register_partitioned_alltypes_parquet(
) {
let testdata = parquet_test_data();
let parquet_file_path = format!("{testdata}/{source_file}");
+ let url = Url::parse("mirror://").unwrap();
ctx.runtime_env().register_object_store(
- "mirror",
- "",
+ &url,
MirroringObjectStore::new_arc(parquet_file_path.clone(), store_paths),
);
diff --git a/datafusion/execution/src/object_store.rs
b/datafusion/execution/src/object_store.rs
index 9b958e3023..c22ec34a60 100644
--- a/datafusion/execution/src/object_store.rs
+++ b/datafusion/execution/src/object_store.rs
@@ -79,21 +79,6 @@ impl std::fmt::Display for ObjectStoreUrl {
}
}
-/// Provides a mechanism for lazy, on-demand creation of an [`ObjectStore`]
-///
-/// For example, to support reading arbitrary buckets from AWS S3
-/// without instantiating an [`ObjectStore`] for each possible bucket
-/// up front, an [`ObjectStoreProvider`] can be used to create the
-/// appropriate [`ObjectStore`] instance on demand.
-///
-/// See [`ObjectStoreRegistry::new_with_provider`]
-pub trait ObjectStoreProvider: Send + Sync + 'static {
- /// Return an ObjectStore for the provided url, called by
[`ObjectStoreRegistry::get_by_url`]
- /// when no matching store has already been registered. The result will be
cached based
- /// on its schema and authority. Any error will be returned to the caller
- fn get_by_url(&self, url: &Url) -> Result<Arc<dyn ObjectStore>>;
-}
-
/// [`ObjectStoreRegistry`] maps a URL to an [`ObjectStore`] instance,
/// and allows DataFusion to read from different [`ObjectStore`]
/// instances. For example DataFusion might be configured so that
@@ -113,15 +98,13 @@ pub trait ObjectStoreProvider: Send + Sync + 'static {
/// ```
///
/// In this particular case, the url `s3://my_bucket/lineitem/` will be
provided to
-/// [`ObjectStoreRegistry::get_by_url`] and one of three things will happen:
+/// [`ObjectStoreRegistry::get_store`] and one of three things will happen:
///
/// - If an [`ObjectStore`] has been registered with
[`ObjectStoreRegistry::register_store`] with
-/// scheme `s3` and host `my_bucket`, that [`ObjectStore`] will be returned
+/// `s3://my_bucket`, that [`ObjectStore`] will be returned
///
-/// - If an [`ObjectStoreProvider`] has been associated with this
[`ObjectStoreRegistry`] using
-/// [`ObjectStoreRegistry::new_with_provider`],
[`ObjectStoreProvider::get_by_url`] will be invoked,
-/// and the returned [`ObjectStore`] registered on this
[`ObjectStoreRegistry`]. Any error will
-/// be returned to the caller
+/// - If an AWS S3 object store can be ad-hoc discovered by the url
`s3://my_bucket/lineitem/`, this
+/// object store will be registered with key `s3://my_bucket` and returned.
///
/// - Otherwise an error will be returned, indicating that no suitable
[`ObjectStore`] could
/// be found
@@ -132,18 +115,38 @@ pub trait ObjectStoreProvider: Send + Sync + 'static {
/// buckets using [`ObjectStoreRegistry::register_store`]
///
/// 2. Systems relying on ad-hoc discovery, without corresponding DDL, can
create [`ObjectStore`]
-/// lazily, on-demand using [`ObjectStoreProvider`]
+/// lazily by providing a custom implementation of [`ObjectStoreRegistry`]
///
/// [`ListingTableUrl`]: crate::datasource::listing::ListingTableUrl
-pub struct ObjectStoreRegistry {
+pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static {
+ /// If a store with the same key existed before, it is replaced and
returned
+ fn register_store(
+ &self,
+ url: &Url,
+ store: Arc<dyn ObjectStore>,
+ ) -> Option<Arc<dyn ObjectStore>>;
+
+ /// Get a suitable store for the provided URL. For example:
+ ///
+ /// - URL with scheme `file:///` or no scheme will return the default
LocalFS store
+ /// - URL with scheme `s3://bucket/` will return the S3 store
+ /// - URL with scheme `hdfs://hostname:port/` will return the hdfs store
+ ///
+ /// If no [`ObjectStore`] found for the `url`, ad-hoc discovery may be
executed depending on
+ /// the `url` and [`ObjectStoreRegistry`] implementation. An
[`ObjectStore`] may be lazily
+ /// created and registered.
+ fn get_store(&self, url: &Url) -> Result<Arc<dyn ObjectStore>>;
+}
+
+/// The default [`ObjectStoreRegistry`]
+pub struct DefaultObjectStoreRegistry {
/// A map from scheme to object store that serve list / read operations
for the store
object_stores: DashMap<String, Arc<dyn ObjectStore>>,
- provider: Option<Arc<dyn ObjectStoreProvider>>,
}
-impl std::fmt::Debug for ObjectStoreRegistry {
+impl std::fmt::Debug for DefaultObjectStoreRegistry {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
- f.debug_struct("ObjectStoreRegistry")
+ f.debug_struct("DefaultObjectStoreRegistry")
.field(
"schemes",
&self
@@ -156,78 +159,63 @@ impl std::fmt::Debug for ObjectStoreRegistry {
}
}
-impl Default for ObjectStoreRegistry {
+impl Default for DefaultObjectStoreRegistry {
fn default() -> Self {
Self::new()
}
}
-impl ObjectStoreRegistry {
- /// Create an [`ObjectStoreRegistry`] with no [`ObjectStoreProvider`].
- ///
- /// This will register [`LocalFileSystem`] to handle `file://` paths,
further stores
- /// will need to be explicitly registered with calls to
[`ObjectStoreRegistry::register_store`]
+impl DefaultObjectStoreRegistry {
+ /// This will register [`LocalFileSystem`] to handle `file://` paths
pub fn new() -> Self {
- ObjectStoreRegistry::new_with_provider(None)
- }
-
- /// Create an [`ObjectStoreRegistry`] with the provided
[`ObjectStoreProvider`]
- ///
- /// This will register [`LocalFileSystem`] to handle `file://` paths,
further stores
- /// may be explicity registered with calls to
[`ObjectStoreRegistry::register_store`] or
- /// created lazily, on-demand by the provided [`ObjectStoreProvider`]
- pub fn new_with_provider(provider: Option<Arc<dyn ObjectStoreProvider>>)
-> Self {
let object_stores: DashMap<String, Arc<dyn ObjectStore>> =
DashMap::new();
object_stores.insert("file://".to_string(),
Arc::new(LocalFileSystem::new()));
- Self {
- object_stores,
- provider,
- }
+ Self { object_stores }
}
+}
- /// Adds a new store to this registry.
- ///
- /// If a store with the same schema and host existed before, it is
replaced and returned
- pub fn register_store(
+///
+/// Stores are registered based on the scheme, host and port of the provided
URL
+/// with a [`LocalFileSystem::new`] automatically registered for `file://`
+///
+/// For example:
+///
+/// - `file:///my_path` will return the default LocalFS store
+/// - `s3://bucket/path` will return a store registered with `s3://bucket` if
any
+/// - `hdfs://host:port/path` will return a store registered with
`hdfs://host:port` if any
+impl ObjectStoreRegistry for DefaultObjectStoreRegistry {
+ fn register_store(
&self,
- scheme: impl AsRef<str>,
- host: impl AsRef<str>,
+ url: &Url,
store: Arc<dyn ObjectStore>,
) -> Option<Arc<dyn ObjectStore>> {
- let s = format!("{}://{}", scheme.as_ref(), host.as_ref());
+ let s = get_url_key(url);
self.object_stores.insert(s, store)
}
- /// Get a suitable store for the provided URL. For example:
- ///
- /// - URL with scheme `file:///` or no schema will return the default
LocalFS store
- /// - URL with scheme `s3://bucket/` will return the S3 store if it's
registered
- /// - URL with scheme `hdfs://hostname:port/` will return the hdfs store
if it's registered
- ///
- pub fn get_by_url(&self, url: impl AsRef<Url>) -> Result<Arc<dyn
ObjectStore>> {
- let url = url.as_ref();
- // First check whether can get object store from registry
- let s = &url[url::Position::BeforeScheme..url::Position::BeforePath];
- let store = self.object_stores.get(s).map(|o| o.value().clone());
-
- match store {
- Some(store) => Ok(store),
- None => match &self.provider {
- Some(provider) => {
- let store = provider.get_by_url(url)?;
- let key =
-
&url[url::Position::BeforeScheme..url::Position::BeforePath];
- self.object_stores.insert(key.to_owned(), store.clone());
- Ok(store)
- }
- None => Err(DataFusionError::Internal(format!(
+ fn get_store(&self, url: &Url) -> Result<Arc<dyn ObjectStore>> {
+ let s = get_url_key(url);
+ self.object_stores
+ .get(&s)
+ .map(|o| o.value().clone())
+ .ok_or_else(|| {
+ DataFusionError::Internal(format!(
"No suitable object store found for {url}"
- ))),
- },
- }
+ ))
+ })
}
}
+/// Get the key of a url for object store registration.
+/// The credential info will be removed
+fn get_url_key(url: &Url) -> String {
+ format!(
+ "{}://{}",
+ url.scheme(),
+ &url[url::Position::BeforeHost..url::Position::AfterPort],
+ )
+}
+
#[cfg(test)]
mod tests {
use super::*;
@@ -259,4 +247,19 @@ mod tests {
ObjectStoreUrl::parse("s3://username:password@host:123/foo").unwrap_err();
assert_eq!(err.to_string(), "Execution error: ObjectStoreUrl must only
contain scheme and authority, got: /foo");
}
+
+ #[test]
+ fn test_get_url_key() {
+ let file = ObjectStoreUrl::parse("file://").unwrap();
+ let key = get_url_key(&file.url);
+ assert_eq!(key.as_str(), "file://");
+
+ let url = ObjectStoreUrl::parse("s3://bucket").unwrap();
+ let key = get_url_key(&url.url);
+ assert_eq!(key.as_str(), "s3://bucket");
+
+ let url =
ObjectStoreUrl::parse("s3://username:password@host:123").unwrap();
+ let key = get_url_key(&url.url);
+ assert_eq!(key.as_str(), "s3://host:123");
+ }
}
diff --git a/datafusion/execution/src/runtime_env.rs
b/datafusion/execution/src/runtime_env.rs
index 3163e0e03e..67736edf68 100644
--- a/datafusion/execution/src/runtime_env.rs
+++ b/datafusion/execution/src/runtime_env.rs
@@ -21,7 +21,7 @@
use crate::{
disk_manager::{DiskManager, DiskManagerConfig},
memory_pool::{GreedyMemoryPool, MemoryPool, UnboundedMemoryPool},
- object_store::ObjectStoreRegistry,
+ object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
};
use datafusion_common::{DataFusionError, Result};
@@ -39,7 +39,7 @@ pub struct RuntimeEnv {
/// Manage temporary files during query execution
pub disk_manager: Arc<DiskManager>,
/// Object Store Registry
- pub object_store_registry: Arc<ObjectStoreRegistry>,
+ pub object_store_registry: Arc<dyn ObjectStoreRegistry>,
}
impl Debug for RuntimeEnv {
@@ -67,10 +67,9 @@ impl RuntimeEnv {
})
}
- /// Registers a custom `ObjectStore` to be used when accessing a
- /// specific scheme and host. This allows DataFusion to create
- /// external tables from urls that do not have built in support
- /// such as `hdfs://...`.
+ /// Registers a custom `ObjectStore` to be used with a specific url.
+ /// This allows DataFusion to create external tables from urls that do not
have
+ /// built in support such as `hdfs://namenode:port/...`.
///
/// Returns the [`ObjectStore`] previously registered for this
/// scheme, if any.
@@ -78,20 +77,18 @@ impl RuntimeEnv {
/// See [`ObjectStoreRegistry`] for more details
pub fn register_object_store(
&self,
- scheme: impl AsRef<str>,
- host: impl AsRef<str>,
+ url: &Url,
object_store: Arc<dyn ObjectStore>,
) -> Option<Arc<dyn ObjectStore>> {
- self.object_store_registry
- .register_store(scheme, host, object_store)
+ self.object_store_registry.register_store(url, object_store)
}
/// Retrieves a `ObjectStore` instance for a url by consulting the
- /// registery. See [`ObjectStoreRegistry::get_by_url`] for more
+ /// registry. See [`ObjectStoreRegistry::get_store`] for more
/// details.
pub fn object_store(&self, url: impl AsRef<Url>) -> Result<Arc<dyn
ObjectStore>> {
self.object_store_registry
- .get_by_url(url)
+ .get_store(url.as_ref())
.map_err(DataFusionError::from)
}
}
@@ -102,7 +99,7 @@ impl Default for RuntimeEnv {
}
}
-#[derive(Clone, Default)]
+#[derive(Clone)]
/// Execution runtime configuration
pub struct RuntimeConfig {
/// DiskManager to manage temporary disk file usage
@@ -112,13 +109,23 @@ pub struct RuntimeConfig {
/// Defaults to using an [`UnboundedMemoryPool`] if `None`
pub memory_pool: Option<Arc<dyn MemoryPool>>,
/// ObjectStoreRegistry to get object store based on url
- pub object_store_registry: Arc<ObjectStoreRegistry>,
+ pub object_store_registry: Arc<dyn ObjectStoreRegistry>,
+}
+
+impl Default for RuntimeConfig {
+ fn default() -> Self {
+ Self::new()
+ }
}
impl RuntimeConfig {
/// New with default values
pub fn new() -> Self {
- Default::default()
+ Self {
+ disk_manager: Default::default(),
+ memory_pool: Default::default(),
+ object_store_registry:
Arc::new(DefaultObjectStoreRegistry::default()),
+ }
}
/// Customize disk manager
@@ -136,7 +143,7 @@ impl RuntimeConfig {
/// Customize object store registry
pub fn with_object_store_registry(
mut self,
- object_store_registry: Arc<ObjectStoreRegistry>,
+ object_store_registry: Arc<dyn ObjectStoreRegistry>,
) -> Self {
self.object_store_registry = object_store_registry;
self