This is an automated email from the ASF dual-hosted git repository. xushiyan pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push: new 2d94b74 fix: handle schema retrieval for datafusion api (#187) 2d94b74 is described below commit 2d94b74936f62ea32feb0ef0204098d41fd55bf2 Author: Shiyan Xu <2701446+xushi...@users.noreply.github.com> AuthorDate: Tue Nov 19 21:46:48 2024 -1000 fix: handle schema retrieval for datafusion api (#187) --- crates/datafusion/src/lib.rs | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs index 739cc90..cd82220 100644 --- a/crates/datafusion/src/lib.rs +++ b/crates/datafusion/src/lib.rs @@ -23,7 +23,7 @@ use std::fmt::Debug; use std::sync::Arc; use std::thread; -use arrow_schema::SchemaRef; +use arrow_schema::{Schema, SchemaRef}; use async_trait::async_trait; use datafusion::catalog::{Session, TableProviderFactory}; use datafusion::datasource::listing::PartitionedFile; @@ -86,7 +86,8 @@ impl TableProvider for HudiDataSource { let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async { table.get_schema().await }) }); - SchemaRef::from(handle.join().unwrap().unwrap()) + let result = handle.join().unwrap().unwrap_or_else(|_| Schema::empty()); + SchemaRef::from(result) } fn table_type(&self) -> TableType { @@ -212,7 +213,7 @@ mod tests { use hudi_core::config::read::HudiReadConfig::InputPartitions; use hudi_tests::TestTable::{ - V6ComplexkeygenHivestyle, V6Nonpartitioned, V6SimplekeygenHivestyleNoMetafields, + V6ComplexkeygenHivestyle, V6Empty, V6Nonpartitioned, V6SimplekeygenHivestyleNoMetafields, V6SimplekeygenNonhivestyle, V6SimplekeygenNonhivestyleOverwritetable, V6TimebasedkeygenNonhivestyle, }; @@ -231,6 +232,16 @@ mod tests { assert_eq!(hudi.get_input_partitions(), 0) } + #[tokio::test] + async fn test_get_empty_schema_from_empty_table() { + let table_provider = + HudiDataSource::new_with_options(V6Empty.path().as_str(), empty_options()) + .await + .unwrap(); + let schema = table_provider.schema(); + assert!(schema.fields().is_empty()); + } + async fn register_test_table_with_session<I, K, V>( test_table: &TestTable, options: I, @@ -345,7 +356,7 @@ mod tests { } #[tokio::test] - async fn datafusion_read_hudi_table() { + async fn test_datafusion_read_hudi_table() { for (test_table, use_sql, planned_input_partitions) in &[ (V6ComplexkeygenHivestyle, true, 2), (V6Nonpartitioned, true, 1), @@ -386,7 +397,7 @@ mod tests { } #[tokio::test] - async fn datafusion_read_hudi_table_with_replacecommits() { + async fn test_datafusion_read_hudi_table_with_replacecommits() { for (test_table, use_sql, planned_input_partitions) in &[(V6SimplekeygenNonhivestyleOverwritetable, true, 1)] {