This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 3359e10 fix: register object store with datafusion (#107)
3359e10 is described below
commit 3359e108b8806c50c008072ef6afbcca86b2e4da
Author: Shaurya <[email protected]>
AuthorDate: Mon Aug 12 18:26:03 2024 +0200
fix: register object store with datafusion (#107)
---------
Co-authored-by: Shiyan Xu <[email protected]>
---
crates/core/Cargo.toml | 14 ++++++++++++++
crates/core/src/storage/mod.rs | 8 ++++++++
crates/core/src/table/fs_view.rs | 2 +-
crates/core/src/table/mod.rs | 13 +++++++++++++
crates/core/src/table/timeline.rs | 2 +-
crates/datafusion/Cargo.toml | 2 +-
crates/datafusion/src/lib.rs | 2 ++
7 files changed, 40 insertions(+), 3 deletions(-)
diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml
index ac000ac..dcaf547 100644
--- a/crates/core/Cargo.toml
+++ b/crates/core/Cargo.toml
@@ -60,5 +60,19 @@ dashmap = { workspace = true }
futures = { workspace = true }
tokio = { workspace = true }
+# datafusion
+datafusion = { workspace = true, optional = true }
+datafusion-expr = { workspace = true, optional = true }
+datafusion-common = { workspace = true, optional = true }
+datafusion-physical-expr = { workspace = true, optional = true }
+
[dev-dependencies]
hudi-tests = { path = "../tests" }
+
+[features]
+datafusion = [
+ "dep:datafusion",
+ "datafusion-expr",
+ "datafusion-common",
+ "datafusion-physical-expr",
+]
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index 374f334..c7eb1ec 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -58,6 +58,14 @@ impl Storage {
}
}
+ #[cfg(feature = "datafusion")]
+ pub fn register_object_store(
+ &self,
+ runtime_env: Arc<datafusion::execution::runtime_env::RuntimeEnv>,
+ ) {
+ runtime_env.register_object_store(self.base_url.as_ref(),
self.object_store.clone());
+ }
+
#[cfg(test)]
async fn get_file_info(&self, relative_path: &str) -> Result<FileInfo> {
let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index 974bbbc..65cc2a9 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -34,7 +34,7 @@ use crate::storage::{get_leaf_dirs, Storage};
#[allow(dead_code)]
pub struct FileSystemView {
configs: Arc<HudiConfigs>,
- storage: Arc<Storage>,
+ pub(crate) storage: Arc<Storage>,
partition_to_file_groups: Arc<DashMap<String, Vec<FileGroup>>>,
}
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index c08105d..1fe2cb7 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -93,6 +93,19 @@ impl Table {
})
}
+ #[cfg(feature = "datafusion")]
+ pub fn register_storage(
+ &self,
+ runtime_env: Arc<datafusion::execution::runtime_env::RuntimeEnv>,
+ ) {
+ self.timeline
+ .storage
+ .register_object_store(runtime_env.clone());
+ self.file_system_view
+ .storage
+ .register_object_store(runtime_env.clone());
+ }
+
async fn load_configs<I, K, V>(
base_url: Arc<Url>,
all_options: I,
diff --git a/crates/core/src/table/timeline.rs
b/crates/core/src/table/timeline.rs
index ae92bac..e502cd6 100644
--- a/crates/core/src/table/timeline.rs
+++ b/crates/core/src/table/timeline.rs
@@ -92,7 +92,7 @@ impl Instant {
#[allow(dead_code)]
pub struct Timeline {
configs: Arc<HudiConfigs>,
- storage: Arc<Storage>,
+ pub(crate) storage: Arc<Storage>,
pub instants: Vec<Instant>,
}
diff --git a/crates/datafusion/Cargo.toml b/crates/datafusion/Cargo.toml
index da0882b..91ce78e 100644
--- a/crates/datafusion/Cargo.toml
+++ b/crates/datafusion/Cargo.toml
@@ -28,7 +28,7 @@ homepage.workspace = true
repository.workspace = true
[dependencies]
-hudi-core = { version = "0.2.0", path = "../core" }
+hudi-core = { version = "0.2.0", path = "../core", features = ["datafusion"] }
# arrow
arrow-schema = { workspace = true }
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index 766d3d0..0e40a81 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -97,6 +97,8 @@ impl TableProvider for HudiDataSource {
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
+ self.table.register_storage(state.runtime_env().clone());
+
let file_slices = self
.table
.split_file_slices(self.get_input_partitions())