This is an automated email from the ASF dual-hosted git repository. xushiyan pushed a commit to branch revert-504-upgrade-deps in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
commit c40258a8736f62f64f162c0ab67535128c3f03f0 Author: Shiyan Xu <[email protected]> AuthorDate: Fri Jan 2 02:03:43 2026 -0600 Revert "build(deps): upgrade datafusion, arrow, and other deps (#504)" This reverts commit 7a275c68a3cc053f93b42d8eba2344ae575b76cf. --- .gitignore | 1 - Cargo.toml | 50 ++++++++++++++++---------------- crates/core/src/avro_to_arrow/schema.rs | 2 -- crates/core/src/storage/file_metadata.rs | 4 +-- crates/core/src/storage/mod.rs | 6 ++-- crates/datafusion/src/lib.rs | 46 +++++++++++------------------ crates/datafusion/src/util/expr.rs | 4 +-- python/Cargo.toml | 2 +- python/pyproject.toml | 2 +- python/src/internal.rs | 23 +++++++-------- python/tests/test_datafusion_read.py | 2 +- 11 files changed, 63 insertions(+), 79 deletions(-) diff --git a/.gitignore b/.gitignore index b6e2fa9..51d85c9 100644 --- a/.gitignore +++ b/.gitignore @@ -27,7 +27,6 @@ venv **/.python-version __pycache__ -uv.lock # macOS **/.DS_Store diff --git a/Cargo.toml b/Cargo.toml index 8ed7e7f..52133a7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ resolver = "2" version = "0.5.0-dev" edition = "2021" license = "Apache-2.0" -rust-version = "1.86" +rust-version = "1.85" keywords = ["apachehudi", "hudi", "datalake", "arrow"] readme = "README.md" description = "The native Rust implementation for Apache Hudi" @@ -36,30 +36,30 @@ repository = "https://github.com/apache/hudi-rs" [workspace.dependencies] # arrow -arrow = { version = "~56.0.0"} -arrow-arith = { version = "~56.0.0" } -arrow-array = { version = "~56.0.0" } -arrow-buffer = { version = "~56.0.0" } -arrow-cast = { version = "~56.0.0" } -arrow-ipc = { version = "~56.0.0" } -arrow-json = { version = "~56.0.0" } -arrow-ord = { version = "~56.0.0" } -arrow-row = { version = "~56.0.0" } -arrow-schema = { version = "~56.0.0", features = ["serde"] } -arrow-select = { version = "~56.0.0" } -object_store = { version = "~0.12.3", features = ["aws", "azure", "gcp"] } -parquet = { version = "~56.0.0", features = ["async", "object_store"] } +arrow = { version = "~54.2.0"} +arrow-arith = { version = "~54.2.0" } +arrow-array = { version = "~54.2.0" } +arrow-buffer = { version = "~54.2.0" } +arrow-cast = { version = "~54.2.0" } +arrow-ipc = { version = "~54.2.0" } +arrow-json = { version = "~54.2.0" } +arrow-ord = { version = "~54.2.0" } +arrow-row = { version = "~54.2.0" } +arrow-schema = { version = "~54.2.0", features = ["serde"] } +arrow-select = { version = "~54.2.0" } +object_store = { version = "~0.11.2", features = ["aws", "azure", "gcp"] } +parquet = { version = "~54.2.0", features = ["async", "object_store"] } # avro apache-avro = { version = "~0.21.0", features = ["derive"] } apache-avro-derive = { version = "~0.21.0" } # datafusion -datafusion = { version = "=50.1.0" } -datafusion-expr = { version = "=50.1.0" } -datafusion-common = { version = "=50.1.0" } -datafusion-physical-expr = { version = "=50.1.0" } -datafusion-ffi = { version = "=50.1.0" } +datafusion = { version = "~46.0.0" } +datafusion-expr = { version = "~46.0.0" } +datafusion-common = { version = "~46.0.0" } +datafusion-physical-expr = { version = "~46.0.0" } +datafusion-ffi = { version = "~46.0.0" } # serde percent-encoding = { version = "~2.3.1" } @@ -68,16 +68,16 @@ serde_json = { version = "~1.0" } # "stdlib" thiserror = { version = "~2.0.11" } -bytes = { version = "~1.11" } -chrono = { version = "~0.4.42" } +bytes = { version = "~1.10" } +chrono = { version = "=0.4.39" } lazy_static = { version = "~1.5.0" } log = { version = "~0.4" } num-traits = { version = "~0.2" } once_cell = { version = "~1.21.3" } paste = { version = "~1.0.15" } -strum = { version = "~0.27.2", features = ["derive"] } -strum_macros = "~0.27.2" -url = { version = "~2.5.7" } +strum = { version = "~0.27.0", features = ["derive"] } +strum_macros = "~0.27.0" +url = { version = "~2.5.4" } # runtime / async async-recursion = { version = "~1.1.1" } @@ -87,7 +87,7 @@ futures = { version = "~0.3" } tokio = { version = "~1.48", features = ["rt-multi-thread"] } # protobuf -prost = { version = "~0.13.1" } +prost = { version = "~0.13" } # compression flate2 = { version = "^1.1" } diff --git a/crates/core/src/avro_to_arrow/schema.rs b/crates/core/src/avro_to_arrow/schema.rs index 4216ffa..baf67fd 100644 --- a/crates/core/src/avro_to_arrow/schema.rs +++ b/crates/core/src/avro_to_arrow/schema.rs @@ -223,8 +223,6 @@ fn default_field_name(dt: &DataType) -> &str { | DataType::LargeListView(_) => { unimplemented!("View support not implemented") } - DataType::Decimal32(_, _) => "decimal", - DataType::Decimal64(_, _) => "decimal", DataType::Decimal128(_, _) => "decimal", DataType::Decimal256(_, _) => "decimal", } diff --git a/crates/core/src/storage/file_metadata.rs b/crates/core/src/storage/file_metadata.rs index 87d5ad8..649a94a 100644 --- a/crates/core/src/storage/file_metadata.rs +++ b/crates/core/src/storage/file_metadata.rs @@ -23,7 +23,7 @@ pub struct FileMetadata { pub name: String, /// Size in bytes on storage - pub size: u64, + pub size: usize, /// Size in bytes in memory pub byte_size: i64, @@ -36,7 +36,7 @@ pub struct FileMetadata { } impl FileMetadata { - pub fn new(name: impl Into<String>, size: u64) -> Self { + pub fn new(name: impl Into<String>, size: usize) -> Self { Self { name: name.into(), size, diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs index 6b0777b..d1f4d35 100644 --- a/crates/core/src/storage/mod.rs +++ b/crates/core/src/storage/mod.rs @@ -128,7 +128,7 @@ impl Storage { .filename() .ok_or_else(|| InvalidPath(format!("Failed to get file name from: {:?}", &obj_meta)))?; let size = obj_meta.size; - let reader = ParquetObjectReader::new(obj_store, obj_path).with_file_size(size); + let reader = ParquetObjectReader::new(obj_store, obj_meta); let builder = ParquetRecordBatchStreamBuilder::new(reader).await?; let parquet_meta = builder.metadata().clone(); let num_records = parquet_meta.file_metadata().num_rows(); @@ -151,7 +151,7 @@ impl Storage { let obj_path = ObjPath::from_url_path(obj_url.path())?; let obj_store = self.object_store.clone(); let meta = obj_store.head(&obj_path).await?; - let reader = ParquetObjectReader::new(obj_store, obj_path).with_file_size(meta.size); + let reader = ParquetObjectReader::new(obj_store, meta); let builder = ParquetRecordBatchStreamBuilder::new(reader).await?; Ok(builder.metadata().as_ref().clone()) } @@ -189,7 +189,7 @@ impl Storage { let meta = obj_store.head(&obj_path).await?; // read parquet - let reader = ParquetObjectReader::new(obj_store, obj_path).with_file_size(meta.size); + let reader = ParquetObjectReader::new(obj_store, meta); let builder = ParquetRecordBatchStreamBuilder::new(reader).await?; let schema = builder.schema().clone(); let mut stream = builder.build()?; diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs index e31b09f..662548b 100644 --- a/crates/datafusion/src/lib.rs +++ b/crates/datafusion/src/lib.rs @@ -31,8 +31,7 @@ use datafusion::catalog::{Session, TableProviderFactory}; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::parquet::source::ParquetSource; -use datafusion::datasource::physical_plan::FileGroup; -use datafusion::datasource::physical_plan::FileScanConfigBuilder; +use datafusion::datasource::physical_plan::FileScanConfig; use datafusion::datasource::source::DataSourceExec; use datafusion::datasource::TableProvider; use datafusion::error::Result; @@ -144,7 +143,7 @@ impl HudiDataSource { fn is_supported_operand(&self, expr: &Expr) -> bool { match expr { Expr::Column(col) => self.schema().column_with_name(&col.name).is_some(), - Expr::Literal(..) => true, + Expr::Literal(_) => true, _ => false, } } @@ -199,7 +198,7 @@ impl TableProvider for HudiDataSource { let url = join_url_segments(&base_url, &[relative_path.as_str()]) .map_err(|e| Execution(format!("Failed to join URL segments: {e:?}")))?; let size = f.base_file.file_metadata.as_ref().map_or(0, |m| m.size); - let partitioned_file = PartitionedFile::new(url.path(), size); + let partitioned_file = PartitionedFile::new(url.path(), size as u64); parquet_file_group_vec.push(partitioned_file); } parquet_file_groups.push(parquet_file_group_vec) @@ -212,7 +211,6 @@ impl TableProvider for HudiDataSource { global: state.config_options().execution.parquet.clone(), column_specific_options: Default::default(), key_value_metadata: Default::default(), - crypto: Default::default(), }; let table_schema = self.schema(); let mut parquet_source = ParquetSource::new(parquet_opts); @@ -220,21 +218,17 @@ impl TableProvider for HudiDataSource { if let Some(expr) = filter { let df_schema = DFSchema::try_from(table_schema.clone())?; let predicate = create_physical_expr(&expr, &df_schema, state.execution_props())?; - parquet_source = parquet_source.with_predicate(predicate) + parquet_source = parquet_source.with_predicate(table_schema.clone(), predicate) } - let file_groups: Vec<FileGroup> = parquet_file_groups - .into_iter() - .map(FileGroup::from) - .collect(); - - let fsc = FileScanConfigBuilder::new(url, table_schema, Arc::new(parquet_source)) - .with_file_groups(file_groups) - .with_projection(projection.cloned()) - .with_limit(limit) - .build(); + let fsc = Arc::new( + FileScanConfig::new(url, table_schema, Arc::new(parquet_source)) + .with_file_groups(parquet_file_groups) + .with_projection(projection.cloned()) + .with_limit(limit), + ); - Ok(Arc::new(DataSourceExec::new(Arc::new(fsc)))) + Ok(Arc::new(DataSourceExec::new(fsc))) } fn supports_filters_pushdown( @@ -572,16 +566,13 @@ mod tests { let expr0 = Expr::BinaryExpr(BinaryExpr { left: Box::new(Expr::Column(Column::from_name("name".to_string()))), op: Operator::Eq, - right: Box::new(Expr::Literal( - ScalarValue::Utf8(Some("Alice".to_string())), - None, - )), + right: Box::new(Expr::Literal(ScalarValue::Utf8(Some("Alice".to_string())))), }); let expr1 = Expr::BinaryExpr(BinaryExpr { left: Box::new(Expr::Column(Column::from_name("intField".to_string()))), op: Operator::Gt, - right: Box::new(Expr::Literal(ScalarValue::Int32(Some(20000)), None)), + right: Box::new(Expr::Literal(ScalarValue::Int32(Some(20000)))), }); let expr2 = Expr::BinaryExpr(BinaryExpr { @@ -589,24 +580,21 @@ mod tests { "nonexistent_column".to_string(), ))), op: Operator::Eq, - right: Box::new(Expr::Literal(ScalarValue::Int32(Some(1)), None)), + right: Box::new(Expr::Literal(ScalarValue::Int32(Some(1)))), }); let expr3 = Expr::BinaryExpr(BinaryExpr { left: Box::new(Expr::Column(Column::from_name("name".to_string()))), op: Operator::NotEq, - right: Box::new(Expr::Literal( - ScalarValue::Utf8(Some("Diana".to_string())), - None, - )), + right: Box::new(Expr::Literal(ScalarValue::Utf8(Some("Diana".to_string())))), }); - let expr4 = Expr::Literal(ScalarValue::Int32(Some(10)), None); + let expr4 = Expr::Literal(ScalarValue::Int32(Some(10))); let expr5 = Expr::Not(Box::new(Expr::BinaryExpr(BinaryExpr { left: Box::new(Expr::Column(Column::from_name("intField".to_string()))), op: Operator::Gt, - right: Box::new(Expr::Literal(ScalarValue::Int32(Some(20000)), None)), + right: Box::new(Expr::Literal(ScalarValue::Int32(Some(20000)))), }))); let filters = vec![&expr0, &expr1, &expr2, &expr3, &expr4, &expr5]; diff --git a/crates/datafusion/src/util/expr.rs b/crates/datafusion/src/util/expr.rs index 79a5539..7eb0eed 100644 --- a/crates/datafusion/src/util/expr.rs +++ b/crates/datafusion/src/util/expr.rs @@ -50,8 +50,8 @@ pub fn exprs_to_filters(exprs: &[Expr]) -> Vec<(String, String, String)> { fn binary_expr_to_filter(binary_expr: &BinaryExpr) -> Option<HudiFilter> { // extract the column and literal from the binary expression let (column, literal) = match (&*binary_expr.left, &*binary_expr.right) { - (Expr::Column(col), Expr::Literal(lit, _)) => (col, lit), - (Expr::Literal(lit, _), Expr::Column(col)) => (col, lit), + (Expr::Column(col), Expr::Literal(lit)) => (col, lit), + (Expr::Literal(lit), Expr::Column(col)) => (col, lit), _ => return None, }; diff --git a/python/Cargo.toml b/python/Cargo.toml index b8c499c..a87a987 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -48,7 +48,7 @@ futures = { workspace = true } tokio = { workspace = true } [dependencies.pyo3] -version = "~0.25.1" +version = "~0.23" features = ["extension-module", "abi3", "abi3-py39"] [features] diff --git a/python/pyproject.toml b/python/pyproject.toml index 3582d09..123c0b5 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -53,7 +53,7 @@ lint = [ "mypy==1.19.1", ] datafusion = [ - "datafusion==50.1.0", + "datafusion==46.0.0", ] [tool.maturin] diff --git a/python/src/internal.rs b/python/src/internal.rs index b92d39f..3562db2 100644 --- a/python/src/internal.rs +++ b/python/src/internal.rs @@ -36,8 +36,7 @@ use hudi::table::Table; use hudi::timeline::instant::Instant; use hudi::timeline::Timeline; use pyo3::exceptions::PyException; -use pyo3::prelude::*; -use pyo3::{create_exception, pyclass, pyfunction, pymethods, PyErr, PyResult, Python}; +use pyo3::{create_exception, pyclass, pyfunction, pymethods, PyErr, PyObject, PyResult, Python}; use std::error::Error; create_exception!(_internal, HudiCoreError, PyException); @@ -94,12 +93,12 @@ impl HudiFileGroupReader { &self, relative_path: &str, py: Python, - ) -> PyResult<Py<PyAny>> { + ) -> PyResult<PyObject> { rt().block_on(self.inner.read_file_slice_by_base_file_path(relative_path)) .map_err(PythonError::from)? .to_pyarrow(py) } - fn read_file_slice(&self, file_slice: &HudiFileSlice, py: Python) -> PyResult<Py<PyAny>> { + fn read_file_slice(&self, file_slice: &HudiFileSlice, py: Python) -> PyResult<PyObject> { let mut file_group = FileGroup::new_with_base_file_name( &file_slice.base_file_name, &file_slice.partition_path, @@ -130,7 +129,7 @@ impl HudiFileGroupReader { base_file_path: &str, log_file_paths: Vec<String>, py: Python, - ) -> PyResult<Py<PyAny>> { + ) -> PyResult<PyObject> { rt().block_on( self.inner .read_file_slice_from_paths(base_file_path, log_file_paths), @@ -153,7 +152,7 @@ pub struct HudiFileSlice { #[pyo3(get)] base_file_name: String, #[pyo3(get)] - base_file_size: u64, + base_file_size: usize, #[pyo3(get)] base_file_byte_size: i64, #[pyo3(get)] @@ -326,13 +325,13 @@ impl HudiTable { }) } - fn get_schema(&self, py: Python) -> PyResult<Py<PyAny>> { + fn get_schema(&self, py: Python) -> PyResult<PyObject> { rt().block_on(self.inner.get_schema()) .map_err(PythonError::from)? .to_pyarrow(py) } - fn get_partition_schema(&self, py: Python) -> PyResult<Py<PyAny>> { + fn get_partition_schema(&self, py: Python) -> PyResult<PyObject> { rt().block_on(self.inner.get_partition_schema()) .map_err(PythonError::from)? .to_pyarrow(py) @@ -456,7 +455,7 @@ impl HudiTable { &self, filters: Option<Vec<(String, String, String)>>, py: Python, - ) -> PyResult<Py<PyAny>> { + ) -> PyResult<PyObject> { rt().block_on(self.inner.read_snapshot(filters.unwrap_or_default())) .map_err(PythonError::from)? .to_pyarrow(py) @@ -468,7 +467,7 @@ impl HudiTable { timestamp: &str, filters: Option<Vec<(String, String, String)>>, py: Python, - ) -> PyResult<Py<PyAny>> { + ) -> PyResult<PyObject> { rt().block_on( self.inner .read_snapshot_as_of(timestamp, filters.unwrap_or_default()), @@ -483,7 +482,7 @@ impl HudiTable { start_timestamp: &str, end_timestamp: Option<&str>, py: Python, - ) -> PyResult<Py<PyAny>> { + ) -> PyResult<PyObject> { rt().block_on( self.inner .read_incremental_records(start_timestamp, end_timestamp), @@ -582,7 +581,7 @@ impl HudiTimeline { }) } - pub fn get_latest_schema(&self, py: Python) -> PyResult<Py<PyAny>> { + pub fn get_latest_schema(&self, py: Python) -> PyResult<PyObject> { rt().block_on(self.inner.get_latest_schema()) .map_err(PythonError::from)? .to_pyarrow(py) diff --git a/python/tests/test_datafusion_read.py b/python/tests/test_datafusion_read.py index 198a65c..5f91047 100644 --- a/python/tests/test_datafusion_read.py +++ b/python/tests/test_datafusion_read.py @@ -27,6 +27,6 @@ def test_datafusion_table_registry(get_sample_table): table_path, [("hoodie.read.use.read_optimized.mode", "true")] ) ctx = SessionContext() - ctx.register_table("trips", table) + ctx.register_table_provider("trips", table) df = ctx.sql("SELECT city from trips order by city desc limit 1").to_arrow_table() assert df.to_pylist() == [{"city": "sao_paulo"}]
