This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new d16ce67  build(deps): upgrade datafusion & arrow, and restrict deps 
upgrade to patch-level (#386)
d16ce67 is described below

commit d16ce6770a49fead91fe21814962951a4c34d706
Author: Shiyan Xu <[email protected]>
AuthorDate: Sat Jun 28 23:52:21 2025 -0500

    build(deps): upgrade datafusion & arrow, and restrict deps upgrade to 
patch-level (#386)
    
    DataFusion 45.0 -> 46.0
    Arrow 54.1 -> 54.2
    
    Fix DataFusion API usage based on the new version. Update test cases to 
cover filter pushdown.
---
 Cargo.toml                               | 70 ++++++++++++++++----------------
 crates/core/src/avro_to_arrow/README.md  |  4 +-
 crates/core/src/table/partition.rs       |  6 +--
 crates/datafusion/src/lib.rs             | 34 +++++++++-------
 crates/test/Cargo.toml                   |  4 +-
 demo/apps/datafusion/Cargo.toml          |  4 +-
 demo/apps/hudi-table-api/rust/Cargo.toml |  4 +-
 python/Cargo.toml                        |  2 +-
 8 files changed, 66 insertions(+), 62 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 2e0f38b..2291648 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -36,50 +36,50 @@ repository = "https://github.com/apache/hudi-rs";
 
 [workspace.dependencies]
 # arrow
-arrow = { version = "~54.1.0", features = ["pyarrow"] }
-arrow-arith = { version = "~54.1.0" }
-arrow-array = { version = "~54.1.0" }
-arrow-buffer = { version = "~54.1.0" }
-arrow-cast = { version = "~54.1.0" }
-arrow-ipc = { version = "~54.1.0" }
-arrow-json = { version = "~54.1.0" }
-arrow-ord = { version = "~54.1.0" }
-arrow-row = { version = "~54.1.0" }
-arrow-schema = { version = "~54.1.0", features = ["serde"] }
-arrow-select = { version = "~54.1.0" }
+arrow = { version = "~54.2.0"}
+arrow-arith = { version = "~54.2.0" }
+arrow-array = { version = "~54.2.0" }
+arrow-buffer = { version = "~54.2.0" }
+arrow-cast = { version = "~54.2.0" }
+arrow-ipc = { version = "~54.2.0" }
+arrow-json = { version = "~54.2.0" }
+arrow-ord = { version = "~54.2.0" }
+arrow-row = { version = "~54.2.0" }
+arrow-schema = { version = "~54.2.0", features = ["serde"] }
+arrow-select = { version = "~54.2.0" }
 object_store = { version = "~0.11.2", features = ["aws", "azure", "gcp"] }
-parquet = { version = "~54.1.0", features = ["async", "object_store"] }
+parquet = { version = "~54.2.0", features = ["async", "object_store"] }
 
 # avro
-apache-avro = { version = "0.17.0" }
+apache-avro = { version = "~0.17.0" }
 
 # datafusion
-datafusion = { version = "~45.0.0" }
-datafusion-expr = { version = "~45.0.0" }
-datafusion-common = { version = "~45.0.0" }
-datafusion-physical-expr = { version = "~45.0.0" }
+datafusion = { version = "~46.0.0" }
+datafusion-expr = { version = "~46.0.0" }
+datafusion-common = { version = "~46.0.0" }
+datafusion-physical-expr = { version = "~46.0.0" }
 
 # serde
-percent-encoding = { version = "2.3.1" }
-serde = { version = "1.0", features = ["derive"] }
-serde_json = { version = "1.0" }
+percent-encoding = { version = "~2.3.1" }
+serde = { version = "~1.0", features = ["derive"] }
+serde_json = { version = "~1.0" }
 
 # "stdlib"
-thiserror = { version = "2.0.11" }
-bytes = { version = "1" }
+thiserror = { version = "~2.0.11" }
+bytes = { version = "~1.10" }
 chrono = { version = "=0.4.39" }
-lazy_static = { version = "1.5.0" }
-log = { version = "0.4" }
-num-traits = { version = "0.2" }
-once_cell = { version = "1.21.3" }
-paste = { version = "1.0.15" }
-strum = { version = "0.27.0", features = ["derive"] }
-strum_macros = "0.27.0"
-url = { version = "2.5" }
+lazy_static = { version = "~1.5.0" }
+log = { version = "~0.4" }
+num-traits = { version = "~0.2" }
+once_cell = { version = "~1.21.3" }
+paste = { version = "~1.0.15" }
+strum = { version = "~0.27.0", features = ["derive"] }
+strum_macros = "~0.27.0"
+url = { version = "~2.5.4" }
 
 # runtime / async
-async-recursion = { version = "1.1.1" }
-async-trait = { version = "0.1" }
-dashmap = { version = "6.1" }
-futures = { version = "0.3" }
-tokio = { version = "1", features = ["rt-multi-thread"] }
+async-recursion = { version = "~1.1.1" }
+async-trait = { version = "~0.1" }
+dashmap = { version = "~6.1" }
+futures = { version = "~0.3" }
+tokio = { version = "~1.45", features = ["rt-multi-thread"] }
diff --git a/crates/core/src/avro_to_arrow/README.md 
b/crates/core/src/avro_to_arrow/README.md
index 2155382..7449041 100644
--- a/crates/core/src/avro_to_arrow/README.md
+++ b/crates/core/src/avro_to_arrow/README.md
@@ -19,7 +19,7 @@
 
 > [!NOTE]
 > This module is taken
-> from [Apache 
DataFusion](https://github.com/apache/datafusion/tree/45.0.0/datafusion/core/src/datasource/avro_to_arrow)
+> from [Apache 
DataFusion](https://github.com/apache/datafusion/tree/46.0.1/datafusion/core/src/datasource/avro_to_arrow)
 > and modified to work with Hudi Avro log block. The original code is licensed 
 > under the Apache License, Version 2.0.
 
 ## Notable Changes
@@ -27,4 +27,4 @@
 - Removed `reader.rs`.
 - Original tests are removed.
 - DataFusion errors are replaced with Hudi errors.
-- Adjust `AvroArrowArrayReader` to work with Iterator of Avro Values.
\ No newline at end of file
+- Adjust `AvroArrowArrayReader` to work with Iterator of Avro Values.
diff --git a/crates/core/src/table/partition.rs 
b/crates/core/src/table/partition.rs
index 307b428..0378955 100644
--- a/crates/core/src/table/partition.rs
+++ b/crates/core/src/table/partition.rs
@@ -63,10 +63,10 @@ impl PartitionPruner {
         partition_schema: &Schema,
         hudi_configs: &HudiConfigs,
     ) -> Result<Self> {
-        let and_filters = and_filters
+        let and_filters: Vec<SchemableFilter> = and_filters
             .iter()
-            .map(|filter| SchemableFilter::try_from((filter.clone(), 
partition_schema)))
-            .collect::<Result<Vec<SchemableFilter>>>()?;
+            .filter_map(|filter| SchemableFilter::try_from((filter.clone(), 
partition_schema)).ok())
+            .collect();
 
         let schema = Arc::new(partition_schema.clone());
         let is_hive_style: bool = hudi_configs
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index b22a30d..60778a1 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -30,8 +30,9 @@ use async_trait::async_trait;
 use datafusion::catalog::{Session, TableProviderFactory};
 use datafusion::datasource::listing::PartitionedFile;
 use datafusion::datasource::object_store::ObjectStoreUrl;
-use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder;
+use datafusion::datasource::physical_plan::parquet::source::ParquetSource;
 use datafusion::datasource::physical_plan::FileScanConfig;
+use datafusion::datasource::source::DataSourceExec;
 use datafusion::datasource::TableProvider;
 use datafusion::error::Result;
 use datafusion::logical_expr::Operator;
@@ -202,26 +203,29 @@ impl TableProvider for HudiDataSource {
 
         let base_url = self.table.base_url();
         let url = ObjectStoreUrl::parse(get_scheme_authority(&base_url))?;
-        let fsc = FileScanConfig::new(url, self.schema())
-            .with_file_groups(parquet_file_groups)
-            .with_projection(projection.cloned())
-            .with_limit(limit);
 
         let parquet_opts = TableParquetOptions {
             global: state.config_options().execution.parquet.clone(),
             column_specific_options: Default::default(),
             key_value_metadata: Default::default(),
         };
-        let mut exec_builder = ParquetExecBuilder::new_with_options(fsc, 
parquet_opts);
-
+        let table_schema = self.schema();
+        let mut parquet_source = ParquetSource::new(parquet_opts);
         let filter = filters.iter().cloned().reduce(|acc, new| acc.and(new));
         if let Some(expr) = filter {
-            let df_schema = DFSchema::try_from(self.schema())?;
+            let df_schema = DFSchema::try_from(table_schema.clone())?;
             let predicate = create_physical_expr(&expr, &df_schema, 
state.execution_props())?;
-            exec_builder = exec_builder.with_predicate(predicate)
+            parquet_source = 
parquet_source.with_predicate(table_schema.clone(), predicate)
         }
 
-        Ok(exec_builder.build_arc())
+        let fsc = Arc::new(
+            FileScanConfig::new(url, table_schema, Arc::new(parquet_source))
+                .with_file_groups(parquet_file_groups)
+                .with_projection(projection.cloned())
+                .with_limit(limit),
+        );
+
+        Ok(Arc::new(DataSourceExec::new(fsc)))
     }
 
     fn supports_filters_pushdown(
@@ -467,7 +471,7 @@ mod tests {
             table_name
         )));
         assert!(plan_lines[4].starts_with(
-            "FilterExec: CAST(id@0 AS Int64) % 2 = 0 AND 
get_field(structField@3, field2) > 30"
+            "FilterExec: CAST(id@0 AS Int64) % 2 = 0 AND name@1 != Alice AND 
get_field(structField@3, field2) > 30"
         ));
         assert!(plan_lines[5].contains(&format!("input_partitions={}", 
planned_input_partitioned)));
     }
@@ -486,7 +490,7 @@ mod tests {
     }
 
     #[tokio::test]
-    async fn test_datafusion_read_hudi_table() {
+    async fn test_datafusion_read_hudi_table_with_partition_filter_pushdown() {
         for (test_table, use_sql, planned_input_partitions) in &[
             (V6ComplexkeygenHivestyle, true, 2),
             (V6Nonpartitioned, true, 1),
@@ -503,7 +507,7 @@ mod tests {
             let sql = format!(
                 r#"
             SELECT id, name, isActive, structField.field2
-            FROM {} WHERE id % 2 = 0
+            FROM {} WHERE id % 2 = 0 AND name != 'Alice'
             AND structField.field2 > 30 ORDER BY name LIMIT 10"#,
                 test_table.as_ref()
             );
@@ -527,7 +531,7 @@ mod tests {
     }
 
     #[tokio::test]
-    async fn test_datafusion_read_hudi_table_with_replacecommits() {
+    async fn 
test_datafusion_read_hudi_table_with_replacecommits_with_partition_filter_pushdown()
 {
         for (test_table, use_sql, planned_input_partitions) in
             &[(V6SimplekeygenNonhivestyleOverwritetable, true, 1)]
         {
@@ -540,7 +544,7 @@ mod tests {
             let sql = format!(
                 r#"
             SELECT id, name, isActive, structField.field2
-            FROM {} WHERE id % 2 = 0
+            FROM {} WHERE id % 2 = 0 AND name != 'Alice'
             AND structField.field2 > 30 ORDER BY name LIMIT 10"#,
                 test_table.as_ref()
             );
diff --git a/crates/test/Cargo.toml b/crates/test/Cargo.toml
index 1562097..1f129d3 100644
--- a/crates/test/Cargo.toml
+++ b/crates/test/Cargo.toml
@@ -38,5 +38,5 @@ strum_macros = { workspace = true }
 url = { workspace = true }
 
 # testing
-tempfile = "3"
-zip-extract = "0.3"
+tempfile = "3.20.0"
+zip-extract = "0.3.0"
diff --git a/demo/apps/datafusion/Cargo.toml b/demo/apps/datafusion/Cargo.toml
index 4938de9..3bd0064 100644
--- a/demo/apps/datafusion/Cargo.toml
+++ b/demo/apps/datafusion/Cargo.toml
@@ -24,6 +24,6 @@ version = "0.1.0"
 edition = "2021"
 
 [dependencies]
-tokio = "^1"
-datafusion = "~45.0.0"
+tokio = "~1.45"
+datafusion = "~46.0.0"
 hudi = { path = "../../../crates/hudi", features = ["datafusion"] }
diff --git a/demo/apps/hudi-table-api/rust/Cargo.toml 
b/demo/apps/hudi-table-api/rust/Cargo.toml
index 39139a2..d3164d7 100644
--- a/demo/apps/hudi-table-api/rust/Cargo.toml
+++ b/demo/apps/hudi-table-api/rust/Cargo.toml
@@ -24,7 +24,7 @@ version = "0.1.0"
 edition = "2021"
 
 [dependencies]
-tokio = "^1"
-arrow = { version = "~54.1.0", features = ["pyarrow"] }
+tokio = "~1.45"
+arrow = { version = "~54.2.0" }
 
 hudi = { path = "../../../../crates/hudi" }
diff --git a/python/Cargo.toml b/python/Cargo.toml
index c67147f..3914b96 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -35,7 +35,7 @@ doc = false
 [dependencies]
 hudi = { path = "../crates/hudi" }
 # arrow
-arrow = { workspace = true }
+arrow = { workspace = true, features = ["pyarrow"] }
 
 # "stdlib"
 thiserror = { workspace = true }

Reply via email to