This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch revert-504-upgrade-deps
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git

commit c40258a8736f62f64f162c0ab67535128c3f03f0
Author: Shiyan Xu <[email protected]>
AuthorDate: Fri Jan 2 02:03:43 2026 -0600

    Revert "build(deps): upgrade datafusion, arrow, and other deps (#504)"
    
    This reverts commit 7a275c68a3cc053f93b42d8eba2344ae575b76cf.
---
 .gitignore                               |  1 -
 Cargo.toml                               | 50 ++++++++++++++++----------------
 crates/core/src/avro_to_arrow/schema.rs  |  2 --
 crates/core/src/storage/file_metadata.rs |  4 +--
 crates/core/src/storage/mod.rs           |  6 ++--
 crates/datafusion/src/lib.rs             | 46 +++++++++++------------------
 crates/datafusion/src/util/expr.rs       |  4 +--
 python/Cargo.toml                        |  2 +-
 python/pyproject.toml                    |  2 +-
 python/src/internal.rs                   | 23 +++++++--------
 python/tests/test_datafusion_read.py     |  2 +-
 11 files changed, 63 insertions(+), 79 deletions(-)

diff --git a/.gitignore b/.gitignore
index b6e2fa9..51d85c9 100644
--- a/.gitignore
+++ b/.gitignore
@@ -27,7 +27,6 @@
 venv
 **/.python-version
 __pycache__
-uv.lock
 
 # macOS
 **/.DS_Store
diff --git a/Cargo.toml b/Cargo.toml
index 8ed7e7f..52133a7 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -27,7 +27,7 @@ resolver = "2"
 version = "0.5.0-dev"
 edition = "2021"
 license = "Apache-2.0"
-rust-version = "1.86"
+rust-version = "1.85"
 keywords = ["apachehudi", "hudi", "datalake", "arrow"]
 readme = "README.md"
 description = "The native Rust implementation for Apache Hudi"
@@ -36,30 +36,30 @@ repository = "https://github.com/apache/hudi-rs";
 
 [workspace.dependencies]
 # arrow
-arrow = { version = "~56.0.0"}
-arrow-arith = { version = "~56.0.0" }
-arrow-array = { version = "~56.0.0" }
-arrow-buffer = { version = "~56.0.0" }
-arrow-cast = { version = "~56.0.0" }
-arrow-ipc = { version = "~56.0.0" }
-arrow-json = { version = "~56.0.0" }
-arrow-ord = { version = "~56.0.0" }
-arrow-row = { version = "~56.0.0" }
-arrow-schema = { version = "~56.0.0", features = ["serde"] }
-arrow-select = { version = "~56.0.0" }
-object_store = { version = "~0.12.3", features = ["aws", "azure", "gcp"] }
-parquet = { version = "~56.0.0", features = ["async", "object_store"] }
+arrow = { version = "~54.2.0"}
+arrow-arith = { version = "~54.2.0" }
+arrow-array = { version = "~54.2.0" }
+arrow-buffer = { version = "~54.2.0" }
+arrow-cast = { version = "~54.2.0" }
+arrow-ipc = { version = "~54.2.0" }
+arrow-json = { version = "~54.2.0" }
+arrow-ord = { version = "~54.2.0" }
+arrow-row = { version = "~54.2.0" }
+arrow-schema = { version = "~54.2.0", features = ["serde"] }
+arrow-select = { version = "~54.2.0" }
+object_store = { version = "~0.11.2", features = ["aws", "azure", "gcp"] }
+parquet = { version = "~54.2.0", features = ["async", "object_store"] }
 
 # avro
 apache-avro = { version = "~0.21.0", features = ["derive"] }
 apache-avro-derive = { version = "~0.21.0" }
 
 # datafusion
-datafusion = { version = "=50.1.0" }
-datafusion-expr = { version = "=50.1.0" }
-datafusion-common = { version = "=50.1.0" }
-datafusion-physical-expr = { version = "=50.1.0" }
-datafusion-ffi = { version = "=50.1.0" }
+datafusion = { version = "~46.0.0" }
+datafusion-expr = { version = "~46.0.0" }
+datafusion-common = { version = "~46.0.0" }
+datafusion-physical-expr = { version = "~46.0.0" }
+datafusion-ffi = { version = "~46.0.0" }
 
 # serde
 percent-encoding = { version = "~2.3.1" }
@@ -68,16 +68,16 @@ serde_json = { version = "~1.0" }
 
 # "stdlib"
 thiserror = { version = "~2.0.11" }
-bytes = { version = "~1.11" }
-chrono = { version = "~0.4.42" }
+bytes = { version = "~1.10" }
+chrono = { version = "=0.4.39" }
 lazy_static = { version = "~1.5.0" }
 log = { version = "~0.4" }
 num-traits = { version = "~0.2" }
 once_cell = { version = "~1.21.3" }
 paste = { version = "~1.0.15" }
-strum = { version = "~0.27.2", features = ["derive"] }
-strum_macros = "~0.27.2"
-url = { version = "~2.5.7" }
+strum = { version = "~0.27.0", features = ["derive"] }
+strum_macros = "~0.27.0"
+url = { version = "~2.5.4" }
 
 # runtime / async
 async-recursion = { version = "~1.1.1" }
@@ -87,7 +87,7 @@ futures = { version = "~0.3" }
 tokio = { version = "~1.48", features = ["rt-multi-thread"] }
 
 # protobuf
-prost = { version = "~0.13.1" }
+prost = { version = "~0.13" }
 
 # compression
 flate2 = { version = "^1.1" }
diff --git a/crates/core/src/avro_to_arrow/schema.rs 
b/crates/core/src/avro_to_arrow/schema.rs
index 4216ffa..baf67fd 100644
--- a/crates/core/src/avro_to_arrow/schema.rs
+++ b/crates/core/src/avro_to_arrow/schema.rs
@@ -223,8 +223,6 @@ fn default_field_name(dt: &DataType) -> &str {
         | DataType::LargeListView(_) => {
             unimplemented!("View support not implemented")
         }
-        DataType::Decimal32(_, _) => "decimal",
-        DataType::Decimal64(_, _) => "decimal",
         DataType::Decimal128(_, _) => "decimal",
         DataType::Decimal256(_, _) => "decimal",
     }
diff --git a/crates/core/src/storage/file_metadata.rs 
b/crates/core/src/storage/file_metadata.rs
index 87d5ad8..649a94a 100644
--- a/crates/core/src/storage/file_metadata.rs
+++ b/crates/core/src/storage/file_metadata.rs
@@ -23,7 +23,7 @@ pub struct FileMetadata {
     pub name: String,
 
     /// Size in bytes on storage
-    pub size: u64,
+    pub size: usize,
 
     /// Size in bytes in memory
     pub byte_size: i64,
@@ -36,7 +36,7 @@ pub struct FileMetadata {
 }
 
 impl FileMetadata {
-    pub fn new(name: impl Into<String>, size: u64) -> Self {
+    pub fn new(name: impl Into<String>, size: usize) -> Self {
         Self {
             name: name.into(),
             size,
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index 6b0777b..d1f4d35 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -128,7 +128,7 @@ impl Storage {
             .filename()
             .ok_or_else(|| InvalidPath(format!("Failed to get file name from: 
{:?}", &obj_meta)))?;
         let size = obj_meta.size;
-        let reader = ParquetObjectReader::new(obj_store, 
obj_path).with_file_size(size);
+        let reader = ParquetObjectReader::new(obj_store, obj_meta);
         let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
         let parquet_meta = builder.metadata().clone();
         let num_records = parquet_meta.file_metadata().num_rows();
@@ -151,7 +151,7 @@ impl Storage {
         let obj_path = ObjPath::from_url_path(obj_url.path())?;
         let obj_store = self.object_store.clone();
         let meta = obj_store.head(&obj_path).await?;
-        let reader = ParquetObjectReader::new(obj_store, 
obj_path).with_file_size(meta.size);
+        let reader = ParquetObjectReader::new(obj_store, meta);
         let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
         Ok(builder.metadata().as_ref().clone())
     }
@@ -189,7 +189,7 @@ impl Storage {
         let meta = obj_store.head(&obj_path).await?;
 
         // read parquet
-        let reader = ParquetObjectReader::new(obj_store, 
obj_path).with_file_size(meta.size);
+        let reader = ParquetObjectReader::new(obj_store, meta);
         let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
         let schema = builder.schema().clone();
         let mut stream = builder.build()?;
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index e31b09f..662548b 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -31,8 +31,7 @@ use datafusion::catalog::{Session, TableProviderFactory};
 use datafusion::datasource::listing::PartitionedFile;
 use datafusion::datasource::object_store::ObjectStoreUrl;
 use datafusion::datasource::physical_plan::parquet::source::ParquetSource;
-use datafusion::datasource::physical_plan::FileGroup;
-use datafusion::datasource::physical_plan::FileScanConfigBuilder;
+use datafusion::datasource::physical_plan::FileScanConfig;
 use datafusion::datasource::source::DataSourceExec;
 use datafusion::datasource::TableProvider;
 use datafusion::error::Result;
@@ -144,7 +143,7 @@ impl HudiDataSource {
     fn is_supported_operand(&self, expr: &Expr) -> bool {
         match expr {
             Expr::Column(col) => 
self.schema().column_with_name(&col.name).is_some(),
-            Expr::Literal(..) => true,
+            Expr::Literal(_) => true,
             _ => false,
         }
     }
@@ -199,7 +198,7 @@ impl TableProvider for HudiDataSource {
                 let url = join_url_segments(&base_url, 
&[relative_path.as_str()])
                     .map_err(|e| Execution(format!("Failed to join URL 
segments: {e:?}")))?;
                 let size = f.base_file.file_metadata.as_ref().map_or(0, |m| 
m.size);
-                let partitioned_file = PartitionedFile::new(url.path(), size);
+                let partitioned_file = PartitionedFile::new(url.path(), size 
as u64);
                 parquet_file_group_vec.push(partitioned_file);
             }
             parquet_file_groups.push(parquet_file_group_vec)
@@ -212,7 +211,6 @@ impl TableProvider for HudiDataSource {
             global: state.config_options().execution.parquet.clone(),
             column_specific_options: Default::default(),
             key_value_metadata: Default::default(),
-            crypto: Default::default(),
         };
         let table_schema = self.schema();
         let mut parquet_source = ParquetSource::new(parquet_opts);
@@ -220,21 +218,17 @@ impl TableProvider for HudiDataSource {
         if let Some(expr) = filter {
             let df_schema = DFSchema::try_from(table_schema.clone())?;
             let predicate = create_physical_expr(&expr, &df_schema, 
state.execution_props())?;
-            parquet_source = parquet_source.with_predicate(predicate)
+            parquet_source = 
parquet_source.with_predicate(table_schema.clone(), predicate)
         }
 
-        let file_groups: Vec<FileGroup> = parquet_file_groups
-            .into_iter()
-            .map(FileGroup::from)
-            .collect();
-
-        let fsc = FileScanConfigBuilder::new(url, table_schema, 
Arc::new(parquet_source))
-            .with_file_groups(file_groups)
-            .with_projection(projection.cloned())
-            .with_limit(limit)
-            .build();
+        let fsc = Arc::new(
+            FileScanConfig::new(url, table_schema, Arc::new(parquet_source))
+                .with_file_groups(parquet_file_groups)
+                .with_projection(projection.cloned())
+                .with_limit(limit),
+        );
 
-        Ok(Arc::new(DataSourceExec::new(Arc::new(fsc))))
+        Ok(Arc::new(DataSourceExec::new(fsc)))
     }
 
     fn supports_filters_pushdown(
@@ -572,16 +566,13 @@ mod tests {
         let expr0 = Expr::BinaryExpr(BinaryExpr {
             left: 
Box::new(Expr::Column(Column::from_name("name".to_string()))),
             op: Operator::Eq,
-            right: Box::new(Expr::Literal(
-                ScalarValue::Utf8(Some("Alice".to_string())),
-                None,
-            )),
+            right: 
Box::new(Expr::Literal(ScalarValue::Utf8(Some("Alice".to_string())))),
         });
 
         let expr1 = Expr::BinaryExpr(BinaryExpr {
             left: 
Box::new(Expr::Column(Column::from_name("intField".to_string()))),
             op: Operator::Gt,
-            right: Box::new(Expr::Literal(ScalarValue::Int32(Some(20000)), 
None)),
+            right: Box::new(Expr::Literal(ScalarValue::Int32(Some(20000)))),
         });
 
         let expr2 = Expr::BinaryExpr(BinaryExpr {
@@ -589,24 +580,21 @@ mod tests {
                 "nonexistent_column".to_string(),
             ))),
             op: Operator::Eq,
-            right: Box::new(Expr::Literal(ScalarValue::Int32(Some(1)), None)),
+            right: Box::new(Expr::Literal(ScalarValue::Int32(Some(1)))),
         });
 
         let expr3 = Expr::BinaryExpr(BinaryExpr {
             left: 
Box::new(Expr::Column(Column::from_name("name".to_string()))),
             op: Operator::NotEq,
-            right: Box::new(Expr::Literal(
-                ScalarValue::Utf8(Some("Diana".to_string())),
-                None,
-            )),
+            right: 
Box::new(Expr::Literal(ScalarValue::Utf8(Some("Diana".to_string())))),
         });
 
-        let expr4 = Expr::Literal(ScalarValue::Int32(Some(10)), None);
+        let expr4 = Expr::Literal(ScalarValue::Int32(Some(10)));
 
         let expr5 = Expr::Not(Box::new(Expr::BinaryExpr(BinaryExpr {
             left: 
Box::new(Expr::Column(Column::from_name("intField".to_string()))),
             op: Operator::Gt,
-            right: Box::new(Expr::Literal(ScalarValue::Int32(Some(20000)), 
None)),
+            right: Box::new(Expr::Literal(ScalarValue::Int32(Some(20000)))),
         })));
 
         let filters = vec![&expr0, &expr1, &expr2, &expr3, &expr4, &expr5];
diff --git a/crates/datafusion/src/util/expr.rs 
b/crates/datafusion/src/util/expr.rs
index 79a5539..7eb0eed 100644
--- a/crates/datafusion/src/util/expr.rs
+++ b/crates/datafusion/src/util/expr.rs
@@ -50,8 +50,8 @@ pub fn exprs_to_filters(exprs: &[Expr]) -> Vec<(String, 
String, String)> {
 fn binary_expr_to_filter(binary_expr: &BinaryExpr) -> Option<HudiFilter> {
     // extract the column and literal from the binary expression
     let (column, literal) = match (&*binary_expr.left, &*binary_expr.right) {
-        (Expr::Column(col), Expr::Literal(lit, _)) => (col, lit),
-        (Expr::Literal(lit, _), Expr::Column(col)) => (col, lit),
+        (Expr::Column(col), Expr::Literal(lit)) => (col, lit),
+        (Expr::Literal(lit), Expr::Column(col)) => (col, lit),
         _ => return None,
     };
 
diff --git a/python/Cargo.toml b/python/Cargo.toml
index b8c499c..a87a987 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -48,7 +48,7 @@ futures = { workspace = true }
 tokio = { workspace = true }
 
 [dependencies.pyo3]
-version = "~0.25.1"
+version = "~0.23"
 features = ["extension-module", "abi3", "abi3-py39"]
 
 [features]
diff --git a/python/pyproject.toml b/python/pyproject.toml
index 3582d09..123c0b5 100644
--- a/python/pyproject.toml
+++ b/python/pyproject.toml
@@ -53,7 +53,7 @@ lint = [
     "mypy==1.19.1",   
 ]
 datafusion = [
-    "datafusion==50.1.0",
+    "datafusion==46.0.0",
 ]
 
 [tool.maturin]
diff --git a/python/src/internal.rs b/python/src/internal.rs
index b92d39f..3562db2 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -36,8 +36,7 @@ use hudi::table::Table;
 use hudi::timeline::instant::Instant;
 use hudi::timeline::Timeline;
 use pyo3::exceptions::PyException;
-use pyo3::prelude::*;
-use pyo3::{create_exception, pyclass, pyfunction, pymethods, PyErr, PyResult, 
Python};
+use pyo3::{create_exception, pyclass, pyfunction, pymethods, PyErr, PyObject, 
PyResult, Python};
 use std::error::Error;
 
 create_exception!(_internal, HudiCoreError, PyException);
@@ -94,12 +93,12 @@ impl HudiFileGroupReader {
         &self,
         relative_path: &str,
         py: Python,
-    ) -> PyResult<Py<PyAny>> {
+    ) -> PyResult<PyObject> {
         
rt().block_on(self.inner.read_file_slice_by_base_file_path(relative_path))
             .map_err(PythonError::from)?
             .to_pyarrow(py)
     }
-    fn read_file_slice(&self, file_slice: &HudiFileSlice, py: Python) -> 
PyResult<Py<PyAny>> {
+    fn read_file_slice(&self, file_slice: &HudiFileSlice, py: Python) -> 
PyResult<PyObject> {
         let mut file_group = FileGroup::new_with_base_file_name(
             &file_slice.base_file_name,
             &file_slice.partition_path,
@@ -130,7 +129,7 @@ impl HudiFileGroupReader {
         base_file_path: &str,
         log_file_paths: Vec<String>,
         py: Python,
-    ) -> PyResult<Py<PyAny>> {
+    ) -> PyResult<PyObject> {
         rt().block_on(
             self.inner
                 .read_file_slice_from_paths(base_file_path, log_file_paths),
@@ -153,7 +152,7 @@ pub struct HudiFileSlice {
     #[pyo3(get)]
     base_file_name: String,
     #[pyo3(get)]
-    base_file_size: u64,
+    base_file_size: usize,
     #[pyo3(get)]
     base_file_byte_size: i64,
     #[pyo3(get)]
@@ -326,13 +325,13 @@ impl HudiTable {
         })
     }
 
-    fn get_schema(&self, py: Python) -> PyResult<Py<PyAny>> {
+    fn get_schema(&self, py: Python) -> PyResult<PyObject> {
         rt().block_on(self.inner.get_schema())
             .map_err(PythonError::from)?
             .to_pyarrow(py)
     }
 
-    fn get_partition_schema(&self, py: Python) -> PyResult<Py<PyAny>> {
+    fn get_partition_schema(&self, py: Python) -> PyResult<PyObject> {
         rt().block_on(self.inner.get_partition_schema())
             .map_err(PythonError::from)?
             .to_pyarrow(py)
@@ -456,7 +455,7 @@ impl HudiTable {
         &self,
         filters: Option<Vec<(String, String, String)>>,
         py: Python,
-    ) -> PyResult<Py<PyAny>> {
+    ) -> PyResult<PyObject> {
         rt().block_on(self.inner.read_snapshot(filters.unwrap_or_default()))
             .map_err(PythonError::from)?
             .to_pyarrow(py)
@@ -468,7 +467,7 @@ impl HudiTable {
         timestamp: &str,
         filters: Option<Vec<(String, String, String)>>,
         py: Python,
-    ) -> PyResult<Py<PyAny>> {
+    ) -> PyResult<PyObject> {
         rt().block_on(
             self.inner
                 .read_snapshot_as_of(timestamp, filters.unwrap_or_default()),
@@ -483,7 +482,7 @@ impl HudiTable {
         start_timestamp: &str,
         end_timestamp: Option<&str>,
         py: Python,
-    ) -> PyResult<Py<PyAny>> {
+    ) -> PyResult<PyObject> {
         rt().block_on(
             self.inner
                 .read_incremental_records(start_timestamp, end_timestamp),
@@ -582,7 +581,7 @@ impl HudiTimeline {
         })
     }
 
-    pub fn get_latest_schema(&self, py: Python) -> PyResult<Py<PyAny>> {
+    pub fn get_latest_schema(&self, py: Python) -> PyResult<PyObject> {
         rt().block_on(self.inner.get_latest_schema())
             .map_err(PythonError::from)?
             .to_pyarrow(py)
diff --git a/python/tests/test_datafusion_read.py 
b/python/tests/test_datafusion_read.py
index 198a65c..5f91047 100644
--- a/python/tests/test_datafusion_read.py
+++ b/python/tests/test_datafusion_read.py
@@ -27,6 +27,6 @@ def test_datafusion_table_registry(get_sample_table):
         table_path, [("hoodie.read.use.read_optimized.mode", "true")]
     )
     ctx = SessionContext()
-    ctx.register_table("trips", table)
+    ctx.register_table_provider("trips", table)
     df = ctx.sql("SELECT  city from trips order by city desc limit 
1").to_arrow_table()
     assert df.to_pylist() == [{"city": "sao_paulo"}]

Reply via email to