This is an automated email from the ASF dual-hosted git repository. comphead pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new 6bf80b107 chore: refactor planner read schema tests (#1886) 6bf80b107 is described below commit 6bf80b107cc1574cb7f259719d0aa203e387efc4 Author: Oleks V <comph...@users.noreply.github.com> AuthorDate: Fri Jun 13 15:59:50 2025 -0700 chore: refactor planner read schema tests (#1886) --- native/core/src/execution/planner.rs | 252 ++++++++++------------------------- 1 file changed, 68 insertions(+), 184 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 0eb3d5cc2..5d5d39635 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -2553,7 +2553,7 @@ mod tests { use futures::{poll, StreamExt}; use std::{sync::Arc, task::Poll}; - use arrow::array::{Array, DictionaryArray, Int32Array, StringArray}; + use arrow::array::{Array, DictionaryArray, Int32Array, RecordBatch, StringArray}; use arrow::datatypes::{DataType, Field, Fields, Schema}; use datafusion::catalog::memory::DataSourceExec; use datafusion::datasource::listing::PartitionedFile; @@ -3133,49 +3133,30 @@ mod tests { }); } - /* - Testing a nested types scenario - - select arr[0].a, arr[0].c from ( - select array(named_struct('a', 1, 'b', 'n', 'c', 'x')) arr) - */ - #[tokio::test] - async fn test_nested_types_list_of_struct_by_index() -> Result<(), DataFusionError> { + /// Executes a `test_data_query` SQL query + /// and saves the result into a temp folder using parquet format + /// Read the file back to the memory using a custom schema + async fn make_parquet_data( + test_data_query: &str, + read_schema: Schema, + ) -> Result<RecordBatch, DataFusionError> { let session_ctx = SessionContext::new(); // generate test data in the temp folder - let test_data = "select make_array(named_struct('a', 1, 'b', 'n', 'c', 'x')) c0"; let tmp_dir = TempDir::new()?; let test_path = tmp_dir.path().to_str().unwrap().to_string(); let plan = session_ctx - .sql(test_data) + .sql(test_data_query) .await? .create_physical_plan() .await?; - // Write parquet file into temp folder + // Write a parquet file into temp folder session_ctx .write_parquet(plan, test_path.clone(), None) .await?; - // Define schema Comet reads with - let required_schema = Schema::new(Fields::from(vec![Field::new( - "c0", - DataType::List( - Field::new( - "element", - DataType::Struct(Fields::from(vec![ - Field::new("a", DataType::Int32, true), - Field::new("c", DataType::Utf8, true), - ] as Vec<Field>)), - true, - ) - .into(), - ), - true, - )])); - // Register all parquet with temp data as file groups let mut file_groups: Vec<FileGroup> = vec![]; for entry in std::fs::read_dir(&test_path)? { @@ -3200,16 +3181,44 @@ mod tests { let object_store_url = ObjectStoreUrl::local_filesystem(); let file_scan_config = - FileScanConfigBuilder::new(object_store_url, required_schema.into(), source) + FileScanConfigBuilder::new(object_store_url, read_schema.into(), source) .with_file_groups(file_groups) .build(); // Run native read let scan = Arc::new(DataSourceExec::new(Arc::new(file_scan_config.clone()))); - let stream = scan.execute(0, session_ctx.task_ctx())?; - let result: Vec<_> = stream.collect().await; + let result: Vec<_> = scan.execute(0, session_ctx.task_ctx())?.collect().await; + Ok(result.first().unwrap().as_ref().unwrap().clone()) + } + + /* + Testing a nested types scenario + + select arr[0].a, arr[0].c from ( + select array(named_struct('a', 1, 'b', 'n', 'c', 'x')) arr) + */ + #[tokio::test] + async fn test_nested_types_list_of_struct_by_index() -> Result<(), DataFusionError> { + let test_data = "select make_array(named_struct('a', 1, 'b', 'n', 'c', 'x')) c0"; + + // Define schema Comet reads with + let required_schema = Schema::new(Fields::from(vec![Field::new( + "c0", + DataType::List( + Field::new( + "element", + DataType::Struct(Fields::from(vec![ + Field::new("a", DataType::Int32, true), + Field::new("c", DataType::Utf8, true), + ] as Vec<Field>)), + true, + ) + .into(), + ), + true, + )])); - let actual = result.first().unwrap().as_ref().unwrap(); + let actual = make_parquet_data(test_data, required_schema).await?; let expected = [ "+----------------+", @@ -3218,7 +3227,7 @@ mod tests { "| [{a: 1, c: x}] |", "+----------------+", ]; - assert_batches_eq!(expected, &[actual.clone()]); + assert_batches_eq!(expected, &[actual]); Ok(()) } @@ -3231,47 +3240,7 @@ mod tests { */ #[tokio::test] async fn test_nested_types_map_keys() -> Result<(), DataFusionError> { - let session_ctx = SessionContext::new(); - - // generate test data in the temp folder let test_data = "select map([named_struct('a', 1, 'b', 'n', 'c', 'x')], [named_struct('a', 2, 'b', 'm', 'c', 'y')]) c0"; - let tmp_dir = TempDir::new()?; - let test_path = tmp_dir.path().to_str().unwrap().to_string(); - - let plan = session_ctx - .sql(test_data) - .await? - .create_physical_plan() - .await?; - - // Write a parquet file into temp folder - session_ctx - .write_parquet(plan, test_path.clone(), None) - .await?; - - // Register all parquet with temp data as file groups - let mut file_groups: Vec<FileGroup> = vec![]; - for entry in std::fs::read_dir(&test_path)? { - let entry = entry?; - let path = entry.path(); - - if path.extension().and_then(|ext| ext.to_str()) == Some("parquet") { - if let Some(path_str) = path.to_str() { - file_groups.push(FileGroup::new(vec![PartitionedFile::from_path( - path_str.into(), - )?])); - } - } - } - - let source = ParquetSource::default().with_schema_adapter_factory(Arc::new( - SparkSchemaAdapterFactory::new( - SparkParquetOptions::new(EvalMode::Ansi, "", false), - None, - ), - ))?; - - // Define schema Comet reads with let required_schema = Schema::new(Fields::from(vec![Field::new( "c0", DataType::Map( @@ -3305,19 +3274,7 @@ mod tests { true, )])); - let object_store_url = ObjectStoreUrl::local_filesystem(); - let file_scan_config = - FileScanConfigBuilder::new(object_store_url, required_schema.into(), source) - .with_file_groups(file_groups) - .build(); - - // Run native read - let scan = Arc::new(DataSourceExec::new(Arc::new(file_scan_config.clone()))); - let stream = scan.execute(0, session_ctx.task_ctx())?; - let result: Vec<_> = stream.collect().await; - - let actual = result.first().unwrap().as_ref().unwrap(); - + let actual = make_parquet_data(test_data, required_schema).await?; let expected = [ "+------------------------------+", "| c0 |", @@ -3330,51 +3287,12 @@ mod tests { Ok(()) } + // Read struct using schema where schema fields do not overlap with + // struct fields #[tokio::test] - async fn test_nested_types_extract_missing_struct_names() -> Result<(), DataFusionError> { - let session_ctx = SessionContext::new(); - - // generate test data in the temp folder + async fn test_nested_types_extract_missing_struct_names_non_overlap( + ) -> Result<(), DataFusionError> { let test_data = "select named_struct('a', 1, 'b', 'abc') c0"; - let tmp_dir = TempDir::new()?; - let test_path = tmp_dir.path().to_str().unwrap().to_string(); - - let plan = session_ctx - .sql(test_data) - .await? - .create_physical_plan() - .await?; - - // Write a parquet file into temp folder - session_ctx - .write_parquet(plan, test_path.clone(), None) - .await?; - - // Register all parquet with temp data as file groups - let mut file_groups: Vec<FileGroup> = vec![]; - for entry in std::fs::read_dir(&test_path)? { - let entry = entry?; - let path = entry.path(); - - if path.extension().and_then(|ext| ext.to_str()) == Some("parquet") { - if let Some(path_str) = path.to_str() { - file_groups.push(FileGroup::new(vec![PartitionedFile::from_path( - path_str.into(), - )?])); - } - } - } - - let source = ParquetSource::default().with_schema_adapter_factory(Arc::new( - SparkSchemaAdapterFactory::new( - SparkParquetOptions::new(EvalMode::Ansi, "", false), - None, - ), - ))?; - - let object_store_url = ObjectStoreUrl::local_filesystem(); - - // Define schema Comet reads with let required_schema = Schema::new(Fields::from(vec![Field::new( "c0", DataType::Struct(Fields::from(vec![ @@ -3383,47 +3301,23 @@ mod tests { ])), true, )])); - - let file_scan_config = FileScanConfigBuilder::new( - object_store_url.clone(), - required_schema.into(), - Arc::clone(&source), - ) - .with_file_groups(file_groups.clone()) - .build(); - - // Run native read - let scan = Arc::new(DataSourceExec::new(Arc::new(file_scan_config.clone()))); - let stream = scan.execute(0, session_ctx.task_ctx())?; - let result: Vec<_> = stream.collect().await; - - let actual = result.first().unwrap().as_ref().unwrap(); - + let actual = make_parquet_data(test_data, required_schema).await?; let expected = ["+----+", "| c0 |", "+----+", "| |", "+----+"]; - assert_batches_eq!(expected, &[actual.clone()]); + assert_batches_eq!(expected, &[actual]); + Ok(()) + } - // Define schema Comet reads with + // Read struct using custom schema to read just a single field from the struct + #[tokio::test] + async fn test_nested_types_extract_missing_struct_names_single_field( + ) -> Result<(), DataFusionError> { + let test_data = "select named_struct('a', 1, 'b', 'abc') c0"; let required_schema = Schema::new(Fields::from(vec![Field::new( "c0", DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int64, true)])), true, )])); - - let file_scan_config = FileScanConfigBuilder::new( - object_store_url.clone(), - required_schema.into(), - Arc::clone(&source), - ) - .with_file_groups(file_groups.clone()) - .build(); - - // Run native read - let scan = Arc::new(DataSourceExec::new(Arc::new(file_scan_config.clone()))); - let stream = scan.execute(0, session_ctx.task_ctx())?; - let result: Vec<_> = stream.collect().await; - - let actual = result.first().unwrap().as_ref().unwrap(); - + let actual = make_parquet_data(test_data, required_schema).await?; let expected = [ "+--------+", "| c0 |", @@ -3431,9 +3325,15 @@ mod tests { "| {a: 1} |", "+--------+", ]; - assert_batches_eq!(expected, &[actual.clone()]); + assert_batches_eq!(expected, &[actual]); + Ok(()) + } - // Define schema Comet reads with + // Read struct using custom schema to handle a missing field + #[tokio::test] + async fn test_nested_types_extract_missing_struct_names_missing_field( + ) -> Result<(), DataFusionError> { + let test_data = "select named_struct('a', 1, 'b', 'abc') c0"; let required_schema = Schema::new(Fields::from(vec![Field::new( "c0", DataType::Struct(Fields::from(vec![ @@ -3442,22 +3342,7 @@ mod tests { ])), true, )])); - - let file_scan_config = FileScanConfigBuilder::new( - object_store_url.clone(), - required_schema.into(), - Arc::clone(&source), - ) - .with_file_groups(file_groups.clone()) - .build(); - - // Run native read - let scan = Arc::new(DataSourceExec::new(Arc::new(file_scan_config.clone()))); - let stream = scan.execute(0, session_ctx.task_ctx())?; - let result: Vec<_> = stream.collect().await; - - let actual = result.first().unwrap().as_ref().unwrap(); - + let actual = make_parquet_data(test_data, required_schema).await?; let expected = [ "+-------------+", "| c0 |", @@ -3465,8 +3350,7 @@ mod tests { "| {a: 1, x: } |", "+-------------+", ]; - assert_batches_eq!(expected, &[actual.clone()]); - + assert_batches_eq!(expected, &[actual]); Ok(()) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org