This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 7a275c6  build(deps): upgrade datafusion, arrow, and other deps (#504)
7a275c6 is described below

commit 7a275c68a3cc053f93b42d8eba2344ae575b76cf
Author: Shiyan Xu <[email protected]>
AuthorDate: Thu Jan 1 17:40:03 2026 -0600

    build(deps): upgrade datafusion, arrow, and other deps (#504)
---
 .gitignore                               |  1 +
 Cargo.toml                               | 50 ++++++++++++++++----------------
 crates/core/src/avro_to_arrow/schema.rs  |  2 ++
 crates/core/src/storage/file_metadata.rs |  4 +--
 crates/core/src/storage/mod.rs           |  6 ++--
 crates/datafusion/src/lib.rs             | 46 ++++++++++++++++++-----------
 crates/datafusion/src/util/expr.rs       |  4 +--
 python/Cargo.toml                        |  2 +-
 python/pyproject.toml                    |  2 +-
 python/src/internal.rs                   | 23 ++++++++-------
 python/tests/test_datafusion_read.py     |  2 +-
 11 files changed, 79 insertions(+), 63 deletions(-)

diff --git a/.gitignore b/.gitignore
index 51d85c9..b6e2fa9 100644
--- a/.gitignore
+++ b/.gitignore
@@ -27,6 +27,7 @@
 venv
 **/.python-version
 __pycache__
+uv.lock
 
 # macOS
 **/.DS_Store
diff --git a/Cargo.toml b/Cargo.toml
index 52133a7..8ed7e7f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -27,7 +27,7 @@ resolver = "2"
 version = "0.5.0-dev"
 edition = "2021"
 license = "Apache-2.0"
-rust-version = "1.85"
+rust-version = "1.86"
 keywords = ["apachehudi", "hudi", "datalake", "arrow"]
 readme = "README.md"
 description = "The native Rust implementation for Apache Hudi"
@@ -36,30 +36,30 @@ repository = "https://github.com/apache/hudi-rs";
 
 [workspace.dependencies]
 # arrow
-arrow = { version = "~54.2.0"}
-arrow-arith = { version = "~54.2.0" }
-arrow-array = { version = "~54.2.0" }
-arrow-buffer = { version = "~54.2.0" }
-arrow-cast = { version = "~54.2.0" }
-arrow-ipc = { version = "~54.2.0" }
-arrow-json = { version = "~54.2.0" }
-arrow-ord = { version = "~54.2.0" }
-arrow-row = { version = "~54.2.0" }
-arrow-schema = { version = "~54.2.0", features = ["serde"] }
-arrow-select = { version = "~54.2.0" }
-object_store = { version = "~0.11.2", features = ["aws", "azure", "gcp"] }
-parquet = { version = "~54.2.0", features = ["async", "object_store"] }
+arrow = { version = "~56.0.0"}
+arrow-arith = { version = "~56.0.0" }
+arrow-array = { version = "~56.0.0" }
+arrow-buffer = { version = "~56.0.0" }
+arrow-cast = { version = "~56.0.0" }
+arrow-ipc = { version = "~56.0.0" }
+arrow-json = { version = "~56.0.0" }
+arrow-ord = { version = "~56.0.0" }
+arrow-row = { version = "~56.0.0" }
+arrow-schema = { version = "~56.0.0", features = ["serde"] }
+arrow-select = { version = "~56.0.0" }
+object_store = { version = "~0.12.3", features = ["aws", "azure", "gcp"] }
+parquet = { version = "~56.0.0", features = ["async", "object_store"] }
 
 # avro
 apache-avro = { version = "~0.21.0", features = ["derive"] }
 apache-avro-derive = { version = "~0.21.0" }
 
 # datafusion
-datafusion = { version = "~46.0.0" }
-datafusion-expr = { version = "~46.0.0" }
-datafusion-common = { version = "~46.0.0" }
-datafusion-physical-expr = { version = "~46.0.0" }
-datafusion-ffi = { version = "~46.0.0" }
+datafusion = { version = "=50.1.0" }
+datafusion-expr = { version = "=50.1.0" }
+datafusion-common = { version = "=50.1.0" }
+datafusion-physical-expr = { version = "=50.1.0" }
+datafusion-ffi = { version = "=50.1.0" }
 
 # serde
 percent-encoding = { version = "~2.3.1" }
@@ -68,16 +68,16 @@ serde_json = { version = "~1.0" }
 
 # "stdlib"
 thiserror = { version = "~2.0.11" }
-bytes = { version = "~1.10" }
-chrono = { version = "=0.4.39" }
+bytes = { version = "~1.11" }
+chrono = { version = "~0.4.42" }
 lazy_static = { version = "~1.5.0" }
 log = { version = "~0.4" }
 num-traits = { version = "~0.2" }
 once_cell = { version = "~1.21.3" }
 paste = { version = "~1.0.15" }
-strum = { version = "~0.27.0", features = ["derive"] }
-strum_macros = "~0.27.0"
-url = { version = "~2.5.4" }
+strum = { version = "~0.27.2", features = ["derive"] }
+strum_macros = "~0.27.2"
+url = { version = "~2.5.7" }
 
 # runtime / async
 async-recursion = { version = "~1.1.1" }
@@ -87,7 +87,7 @@ futures = { version = "~0.3" }
 tokio = { version = "~1.48", features = ["rt-multi-thread"] }
 
 # protobuf
-prost = { version = "~0.13" }
+prost = { version = "~0.13.1" }
 
 # compression
 flate2 = { version = "^1.1" }
diff --git a/crates/core/src/avro_to_arrow/schema.rs 
b/crates/core/src/avro_to_arrow/schema.rs
index baf67fd..4216ffa 100644
--- a/crates/core/src/avro_to_arrow/schema.rs
+++ b/crates/core/src/avro_to_arrow/schema.rs
@@ -223,6 +223,8 @@ fn default_field_name(dt: &DataType) -> &str {
         | DataType::LargeListView(_) => {
             unimplemented!("View support not implemented")
         }
+        DataType::Decimal32(_, _) => "decimal",
+        DataType::Decimal64(_, _) => "decimal",
         DataType::Decimal128(_, _) => "decimal",
         DataType::Decimal256(_, _) => "decimal",
     }
diff --git a/crates/core/src/storage/file_metadata.rs 
b/crates/core/src/storage/file_metadata.rs
index 649a94a..87d5ad8 100644
--- a/crates/core/src/storage/file_metadata.rs
+++ b/crates/core/src/storage/file_metadata.rs
@@ -23,7 +23,7 @@ pub struct FileMetadata {
     pub name: String,
 
     /// Size in bytes on storage
-    pub size: usize,
+    pub size: u64,
 
     /// Size in bytes in memory
     pub byte_size: i64,
@@ -36,7 +36,7 @@ pub struct FileMetadata {
 }
 
 impl FileMetadata {
-    pub fn new(name: impl Into<String>, size: usize) -> Self {
+    pub fn new(name: impl Into<String>, size: u64) -> Self {
         Self {
             name: name.into(),
             size,
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index d1f4d35..6b0777b 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -128,7 +128,7 @@ impl Storage {
             .filename()
             .ok_or_else(|| InvalidPath(format!("Failed to get file name from: 
{:?}", &obj_meta)))?;
         let size = obj_meta.size;
-        let reader = ParquetObjectReader::new(obj_store, obj_meta);
+        let reader = ParquetObjectReader::new(obj_store, 
obj_path).with_file_size(size);
         let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
         let parquet_meta = builder.metadata().clone();
         let num_records = parquet_meta.file_metadata().num_rows();
@@ -151,7 +151,7 @@ impl Storage {
         let obj_path = ObjPath::from_url_path(obj_url.path())?;
         let obj_store = self.object_store.clone();
         let meta = obj_store.head(&obj_path).await?;
-        let reader = ParquetObjectReader::new(obj_store, meta);
+        let reader = ParquetObjectReader::new(obj_store, 
obj_path).with_file_size(meta.size);
         let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
         Ok(builder.metadata().as_ref().clone())
     }
@@ -189,7 +189,7 @@ impl Storage {
         let meta = obj_store.head(&obj_path).await?;
 
         // read parquet
-        let reader = ParquetObjectReader::new(obj_store, meta);
+        let reader = ParquetObjectReader::new(obj_store, 
obj_path).with_file_size(meta.size);
         let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
         let schema = builder.schema().clone();
         let mut stream = builder.build()?;
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index 662548b..e31b09f 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -31,7 +31,8 @@ use datafusion::catalog::{Session, TableProviderFactory};
 use datafusion::datasource::listing::PartitionedFile;
 use datafusion::datasource::object_store::ObjectStoreUrl;
 use datafusion::datasource::physical_plan::parquet::source::ParquetSource;
-use datafusion::datasource::physical_plan::FileScanConfig;
+use datafusion::datasource::physical_plan::FileGroup;
+use datafusion::datasource::physical_plan::FileScanConfigBuilder;
 use datafusion::datasource::source::DataSourceExec;
 use datafusion::datasource::TableProvider;
 use datafusion::error::Result;
@@ -143,7 +144,7 @@ impl HudiDataSource {
     fn is_supported_operand(&self, expr: &Expr) -> bool {
         match expr {
             Expr::Column(col) => 
self.schema().column_with_name(&col.name).is_some(),
-            Expr::Literal(_) => true,
+            Expr::Literal(..) => true,
             _ => false,
         }
     }
@@ -198,7 +199,7 @@ impl TableProvider for HudiDataSource {
                 let url = join_url_segments(&base_url, 
&[relative_path.as_str()])
                     .map_err(|e| Execution(format!("Failed to join URL 
segments: {e:?}")))?;
                 let size = f.base_file.file_metadata.as_ref().map_or(0, |m| 
m.size);
-                let partitioned_file = PartitionedFile::new(url.path(), size 
as u64);
+                let partitioned_file = PartitionedFile::new(url.path(), size);
                 parquet_file_group_vec.push(partitioned_file);
             }
             parquet_file_groups.push(parquet_file_group_vec)
@@ -211,6 +212,7 @@ impl TableProvider for HudiDataSource {
             global: state.config_options().execution.parquet.clone(),
             column_specific_options: Default::default(),
             key_value_metadata: Default::default(),
+            crypto: Default::default(),
         };
         let table_schema = self.schema();
         let mut parquet_source = ParquetSource::new(parquet_opts);
@@ -218,17 +220,21 @@ impl TableProvider for HudiDataSource {
         if let Some(expr) = filter {
             let df_schema = DFSchema::try_from(table_schema.clone())?;
             let predicate = create_physical_expr(&expr, &df_schema, 
state.execution_props())?;
-            parquet_source = 
parquet_source.with_predicate(table_schema.clone(), predicate)
+            parquet_source = parquet_source.with_predicate(predicate)
         }
 
-        let fsc = Arc::new(
-            FileScanConfig::new(url, table_schema, Arc::new(parquet_source))
-                .with_file_groups(parquet_file_groups)
-                .with_projection(projection.cloned())
-                .with_limit(limit),
-        );
+        let file_groups: Vec<FileGroup> = parquet_file_groups
+            .into_iter()
+            .map(FileGroup::from)
+            .collect();
+
+        let fsc = FileScanConfigBuilder::new(url, table_schema, 
Arc::new(parquet_source))
+            .with_file_groups(file_groups)
+            .with_projection(projection.cloned())
+            .with_limit(limit)
+            .build();
 
-        Ok(Arc::new(DataSourceExec::new(fsc)))
+        Ok(Arc::new(DataSourceExec::new(Arc::new(fsc))))
     }
 
     fn supports_filters_pushdown(
@@ -566,13 +572,16 @@ mod tests {
         let expr0 = Expr::BinaryExpr(BinaryExpr {
             left: 
Box::new(Expr::Column(Column::from_name("name".to_string()))),
             op: Operator::Eq,
-            right: 
Box::new(Expr::Literal(ScalarValue::Utf8(Some("Alice".to_string())))),
+            right: Box::new(Expr::Literal(
+                ScalarValue::Utf8(Some("Alice".to_string())),
+                None,
+            )),
         });
 
         let expr1 = Expr::BinaryExpr(BinaryExpr {
             left: 
Box::new(Expr::Column(Column::from_name("intField".to_string()))),
             op: Operator::Gt,
-            right: Box::new(Expr::Literal(ScalarValue::Int32(Some(20000)))),
+            right: Box::new(Expr::Literal(ScalarValue::Int32(Some(20000)), 
None)),
         });
 
         let expr2 = Expr::BinaryExpr(BinaryExpr {
@@ -580,21 +589,24 @@ mod tests {
                 "nonexistent_column".to_string(),
             ))),
             op: Operator::Eq,
-            right: Box::new(Expr::Literal(ScalarValue::Int32(Some(1)))),
+            right: Box::new(Expr::Literal(ScalarValue::Int32(Some(1)), None)),
         });
 
         let expr3 = Expr::BinaryExpr(BinaryExpr {
             left: 
Box::new(Expr::Column(Column::from_name("name".to_string()))),
             op: Operator::NotEq,
-            right: 
Box::new(Expr::Literal(ScalarValue::Utf8(Some("Diana".to_string())))),
+            right: Box::new(Expr::Literal(
+                ScalarValue::Utf8(Some("Diana".to_string())),
+                None,
+            )),
         });
 
-        let expr4 = Expr::Literal(ScalarValue::Int32(Some(10)));
+        let expr4 = Expr::Literal(ScalarValue::Int32(Some(10)), None);
 
         let expr5 = Expr::Not(Box::new(Expr::BinaryExpr(BinaryExpr {
             left: 
Box::new(Expr::Column(Column::from_name("intField".to_string()))),
             op: Operator::Gt,
-            right: Box::new(Expr::Literal(ScalarValue::Int32(Some(20000)))),
+            right: Box::new(Expr::Literal(ScalarValue::Int32(Some(20000)), 
None)),
         })));
 
         let filters = vec![&expr0, &expr1, &expr2, &expr3, &expr4, &expr5];
diff --git a/crates/datafusion/src/util/expr.rs 
b/crates/datafusion/src/util/expr.rs
index 7eb0eed..79a5539 100644
--- a/crates/datafusion/src/util/expr.rs
+++ b/crates/datafusion/src/util/expr.rs
@@ -50,8 +50,8 @@ pub fn exprs_to_filters(exprs: &[Expr]) -> Vec<(String, 
String, String)> {
 fn binary_expr_to_filter(binary_expr: &BinaryExpr) -> Option<HudiFilter> {
     // extract the column and literal from the binary expression
     let (column, literal) = match (&*binary_expr.left, &*binary_expr.right) {
-        (Expr::Column(col), Expr::Literal(lit)) => (col, lit),
-        (Expr::Literal(lit), Expr::Column(col)) => (col, lit),
+        (Expr::Column(col), Expr::Literal(lit, _)) => (col, lit),
+        (Expr::Literal(lit, _), Expr::Column(col)) => (col, lit),
         _ => return None,
     };
 
diff --git a/python/Cargo.toml b/python/Cargo.toml
index a87a987..b8c499c 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -48,7 +48,7 @@ futures = { workspace = true }
 tokio = { workspace = true }
 
 [dependencies.pyo3]
-version = "~0.23"
+version = "~0.25.1"
 features = ["extension-module", "abi3", "abi3-py39"]
 
 [features]
diff --git a/python/pyproject.toml b/python/pyproject.toml
index 123c0b5..3582d09 100644
--- a/python/pyproject.toml
+++ b/python/pyproject.toml
@@ -53,7 +53,7 @@ lint = [
     "mypy==1.19.1",   
 ]
 datafusion = [
-    "datafusion==46.0.0",
+    "datafusion==50.1.0",
 ]
 
 [tool.maturin]
diff --git a/python/src/internal.rs b/python/src/internal.rs
index 3562db2..b92d39f 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -36,7 +36,8 @@ use hudi::table::Table;
 use hudi::timeline::instant::Instant;
 use hudi::timeline::Timeline;
 use pyo3::exceptions::PyException;
-use pyo3::{create_exception, pyclass, pyfunction, pymethods, PyErr, PyObject, 
PyResult, Python};
+use pyo3::prelude::*;
+use pyo3::{create_exception, pyclass, pyfunction, pymethods, PyErr, PyResult, 
Python};
 use std::error::Error;
 
 create_exception!(_internal, HudiCoreError, PyException);
@@ -93,12 +94,12 @@ impl HudiFileGroupReader {
         &self,
         relative_path: &str,
         py: Python,
-    ) -> PyResult<PyObject> {
+    ) -> PyResult<Py<PyAny>> {
         
rt().block_on(self.inner.read_file_slice_by_base_file_path(relative_path))
             .map_err(PythonError::from)?
             .to_pyarrow(py)
     }
-    fn read_file_slice(&self, file_slice: &HudiFileSlice, py: Python) -> 
PyResult<PyObject> {
+    fn read_file_slice(&self, file_slice: &HudiFileSlice, py: Python) -> 
PyResult<Py<PyAny>> {
         let mut file_group = FileGroup::new_with_base_file_name(
             &file_slice.base_file_name,
             &file_slice.partition_path,
@@ -129,7 +130,7 @@ impl HudiFileGroupReader {
         base_file_path: &str,
         log_file_paths: Vec<String>,
         py: Python,
-    ) -> PyResult<PyObject> {
+    ) -> PyResult<Py<PyAny>> {
         rt().block_on(
             self.inner
                 .read_file_slice_from_paths(base_file_path, log_file_paths),
@@ -152,7 +153,7 @@ pub struct HudiFileSlice {
     #[pyo3(get)]
     base_file_name: String,
     #[pyo3(get)]
-    base_file_size: usize,
+    base_file_size: u64,
     #[pyo3(get)]
     base_file_byte_size: i64,
     #[pyo3(get)]
@@ -325,13 +326,13 @@ impl HudiTable {
         })
     }
 
-    fn get_schema(&self, py: Python) -> PyResult<PyObject> {
+    fn get_schema(&self, py: Python) -> PyResult<Py<PyAny>> {
         rt().block_on(self.inner.get_schema())
             .map_err(PythonError::from)?
             .to_pyarrow(py)
     }
 
-    fn get_partition_schema(&self, py: Python) -> PyResult<PyObject> {
+    fn get_partition_schema(&self, py: Python) -> PyResult<Py<PyAny>> {
         rt().block_on(self.inner.get_partition_schema())
             .map_err(PythonError::from)?
             .to_pyarrow(py)
@@ -455,7 +456,7 @@ impl HudiTable {
         &self,
         filters: Option<Vec<(String, String, String)>>,
         py: Python,
-    ) -> PyResult<PyObject> {
+    ) -> PyResult<Py<PyAny>> {
         rt().block_on(self.inner.read_snapshot(filters.unwrap_or_default()))
             .map_err(PythonError::from)?
             .to_pyarrow(py)
@@ -467,7 +468,7 @@ impl HudiTable {
         timestamp: &str,
         filters: Option<Vec<(String, String, String)>>,
         py: Python,
-    ) -> PyResult<PyObject> {
+    ) -> PyResult<Py<PyAny>> {
         rt().block_on(
             self.inner
                 .read_snapshot_as_of(timestamp, filters.unwrap_or_default()),
@@ -482,7 +483,7 @@ impl HudiTable {
         start_timestamp: &str,
         end_timestamp: Option<&str>,
         py: Python,
-    ) -> PyResult<PyObject> {
+    ) -> PyResult<Py<PyAny>> {
         rt().block_on(
             self.inner
                 .read_incremental_records(start_timestamp, end_timestamp),
@@ -581,7 +582,7 @@ impl HudiTimeline {
         })
     }
 
-    pub fn get_latest_schema(&self, py: Python) -> PyResult<PyObject> {
+    pub fn get_latest_schema(&self, py: Python) -> PyResult<Py<PyAny>> {
         rt().block_on(self.inner.get_latest_schema())
             .map_err(PythonError::from)?
             .to_pyarrow(py)
diff --git a/python/tests/test_datafusion_read.py 
b/python/tests/test_datafusion_read.py
index 5f91047..198a65c 100644
--- a/python/tests/test_datafusion_read.py
+++ b/python/tests/test_datafusion_read.py
@@ -27,6 +27,6 @@ def test_datafusion_table_registry(get_sample_table):
         table_path, [("hoodie.read.use.read_optimized.mode", "true")]
     )
     ctx = SessionContext()
-    ctx.register_table_provider("trips", table)
+    ctx.register_table("trips", table)
     df = ctx.sql("SELECT  city from trips order by city desc limit 
1").to_arrow_table()
     assert df.to_pylist() == [{"city": "sao_paulo"}]

Reply via email to