This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 6bf80b107 chore: refactor planner read schema tests (#1886)
6bf80b107 is described below

commit 6bf80b107cc1574cb7f259719d0aa203e387efc4
Author: Oleks V <comph...@users.noreply.github.com>
AuthorDate: Fri Jun 13 15:59:50 2025 -0700

    chore: refactor planner read schema tests (#1886)
---
 native/core/src/execution/planner.rs | 252 ++++++++++-------------------------
 1 file changed, 68 insertions(+), 184 deletions(-)

diff --git a/native/core/src/execution/planner.rs 
b/native/core/src/execution/planner.rs
index 0eb3d5cc2..5d5d39635 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -2553,7 +2553,7 @@ mod tests {
     use futures::{poll, StreamExt};
     use std::{sync::Arc, task::Poll};
 
-    use arrow::array::{Array, DictionaryArray, Int32Array, StringArray};
+    use arrow::array::{Array, DictionaryArray, Int32Array, RecordBatch, 
StringArray};
     use arrow::datatypes::{DataType, Field, Fields, Schema};
     use datafusion::catalog::memory::DataSourceExec;
     use datafusion::datasource::listing::PartitionedFile;
@@ -3133,49 +3133,30 @@ mod tests {
         });
     }
 
-    /*
-    Testing a nested types scenario
-
-    select arr[0].a, arr[0].c from (
-        select array(named_struct('a', 1, 'b', 'n', 'c', 'x')) arr)
-     */
-    #[tokio::test]
-    async fn test_nested_types_list_of_struct_by_index() -> Result<(), 
DataFusionError> {
+    /// Executes a `test_data_query` SQL query
+    /// and saves the result into a temp folder using parquet format
+    /// Read the file back to the memory using a custom schema
+    async fn make_parquet_data(
+        test_data_query: &str,
+        read_schema: Schema,
+    ) -> Result<RecordBatch, DataFusionError> {
         let session_ctx = SessionContext::new();
 
         // generate test data in the temp folder
-        let test_data = "select make_array(named_struct('a', 1, 'b', 'n', 'c', 
'x')) c0";
         let tmp_dir = TempDir::new()?;
         let test_path = tmp_dir.path().to_str().unwrap().to_string();
 
         let plan = session_ctx
-            .sql(test_data)
+            .sql(test_data_query)
             .await?
             .create_physical_plan()
             .await?;
 
-        // Write parquet file into temp folder
+        // Write a parquet file into temp folder
         session_ctx
             .write_parquet(plan, test_path.clone(), None)
             .await?;
 
-        // Define schema Comet reads with
-        let required_schema = Schema::new(Fields::from(vec![Field::new(
-            "c0",
-            DataType::List(
-                Field::new(
-                    "element",
-                    DataType::Struct(Fields::from(vec![
-                        Field::new("a", DataType::Int32, true),
-                        Field::new("c", DataType::Utf8, true),
-                    ] as Vec<Field>)),
-                    true,
-                )
-                .into(),
-            ),
-            true,
-        )]));
-
         // Register all parquet with temp data as file groups
         let mut file_groups: Vec<FileGroup> = vec![];
         for entry in std::fs::read_dir(&test_path)? {
@@ -3200,16 +3181,44 @@ mod tests {
 
         let object_store_url = ObjectStoreUrl::local_filesystem();
         let file_scan_config =
-            FileScanConfigBuilder::new(object_store_url, 
required_schema.into(), source)
+            FileScanConfigBuilder::new(object_store_url, read_schema.into(), 
source)
                 .with_file_groups(file_groups)
                 .build();
 
         // Run native read
         let scan = 
Arc::new(DataSourceExec::new(Arc::new(file_scan_config.clone())));
-        let stream = scan.execute(0, session_ctx.task_ctx())?;
-        let result: Vec<_> = stream.collect().await;
+        let result: Vec<_> = scan.execute(0, 
session_ctx.task_ctx())?.collect().await;
+        Ok(result.first().unwrap().as_ref().unwrap().clone())
+    }
+
+    /*
+    Testing a nested types scenario
+
+    select arr[0].a, arr[0].c from (
+        select array(named_struct('a', 1, 'b', 'n', 'c', 'x')) arr)
+     */
+    #[tokio::test]
+    async fn test_nested_types_list_of_struct_by_index() -> Result<(), 
DataFusionError> {
+        let test_data = "select make_array(named_struct('a', 1, 'b', 'n', 'c', 
'x')) c0";
+
+        // Define schema Comet reads with
+        let required_schema = Schema::new(Fields::from(vec![Field::new(
+            "c0",
+            DataType::List(
+                Field::new(
+                    "element",
+                    DataType::Struct(Fields::from(vec![
+                        Field::new("a", DataType::Int32, true),
+                        Field::new("c", DataType::Utf8, true),
+                    ] as Vec<Field>)),
+                    true,
+                )
+                .into(),
+            ),
+            true,
+        )]));
 
-        let actual = result.first().unwrap().as_ref().unwrap();
+        let actual = make_parquet_data(test_data, required_schema).await?;
 
         let expected = [
             "+----------------+",
@@ -3218,7 +3227,7 @@ mod tests {
             "| [{a: 1, c: x}] |",
             "+----------------+",
         ];
-        assert_batches_eq!(expected, &[actual.clone()]);
+        assert_batches_eq!(expected, &[actual]);
 
         Ok(())
     }
@@ -3231,47 +3240,7 @@ mod tests {
      */
     #[tokio::test]
     async fn test_nested_types_map_keys() -> Result<(), DataFusionError> {
-        let session_ctx = SessionContext::new();
-
-        // generate test data in the temp folder
         let test_data = "select map([named_struct('a', 1, 'b', 'n', 'c', 
'x')], [named_struct('a', 2, 'b', 'm', 'c', 'y')]) c0";
-        let tmp_dir = TempDir::new()?;
-        let test_path = tmp_dir.path().to_str().unwrap().to_string();
-
-        let plan = session_ctx
-            .sql(test_data)
-            .await?
-            .create_physical_plan()
-            .await?;
-
-        // Write a parquet file into temp folder
-        session_ctx
-            .write_parquet(plan, test_path.clone(), None)
-            .await?;
-
-        // Register all parquet with temp data as file groups
-        let mut file_groups: Vec<FileGroup> = vec![];
-        for entry in std::fs::read_dir(&test_path)? {
-            let entry = entry?;
-            let path = entry.path();
-
-            if path.extension().and_then(|ext| ext.to_str()) == 
Some("parquet") {
-                if let Some(path_str) = path.to_str() {
-                    
file_groups.push(FileGroup::new(vec![PartitionedFile::from_path(
-                        path_str.into(),
-                    )?]));
-                }
-            }
-        }
-
-        let source = 
ParquetSource::default().with_schema_adapter_factory(Arc::new(
-            SparkSchemaAdapterFactory::new(
-                SparkParquetOptions::new(EvalMode::Ansi, "", false),
-                None,
-            ),
-        ))?;
-
-        // Define schema Comet reads with
         let required_schema = Schema::new(Fields::from(vec![Field::new(
             "c0",
             DataType::Map(
@@ -3305,19 +3274,7 @@ mod tests {
             true,
         )]));
 
-        let object_store_url = ObjectStoreUrl::local_filesystem();
-        let file_scan_config =
-            FileScanConfigBuilder::new(object_store_url, 
required_schema.into(), source)
-                .with_file_groups(file_groups)
-                .build();
-
-        // Run native read
-        let scan = 
Arc::new(DataSourceExec::new(Arc::new(file_scan_config.clone())));
-        let stream = scan.execute(0, session_ctx.task_ctx())?;
-        let result: Vec<_> = stream.collect().await;
-
-        let actual = result.first().unwrap().as_ref().unwrap();
-
+        let actual = make_parquet_data(test_data, required_schema).await?;
         let expected = [
             "+------------------------------+",
             "| c0                           |",
@@ -3330,51 +3287,12 @@ mod tests {
         Ok(())
     }
 
+    // Read struct using schema where schema fields do not overlap with
+    // struct fields
     #[tokio::test]
-    async fn test_nested_types_extract_missing_struct_names() -> Result<(), 
DataFusionError> {
-        let session_ctx = SessionContext::new();
-
-        // generate test data in the temp folder
+    async fn test_nested_types_extract_missing_struct_names_non_overlap(
+    ) -> Result<(), DataFusionError> {
         let test_data = "select named_struct('a', 1, 'b', 'abc') c0";
-        let tmp_dir = TempDir::new()?;
-        let test_path = tmp_dir.path().to_str().unwrap().to_string();
-
-        let plan = session_ctx
-            .sql(test_data)
-            .await?
-            .create_physical_plan()
-            .await?;
-
-        // Write a parquet file into temp folder
-        session_ctx
-            .write_parquet(plan, test_path.clone(), None)
-            .await?;
-
-        // Register all parquet with temp data as file groups
-        let mut file_groups: Vec<FileGroup> = vec![];
-        for entry in std::fs::read_dir(&test_path)? {
-            let entry = entry?;
-            let path = entry.path();
-
-            if path.extension().and_then(|ext| ext.to_str()) == 
Some("parquet") {
-                if let Some(path_str) = path.to_str() {
-                    
file_groups.push(FileGroup::new(vec![PartitionedFile::from_path(
-                        path_str.into(),
-                    )?]));
-                }
-            }
-        }
-
-        let source = 
ParquetSource::default().with_schema_adapter_factory(Arc::new(
-            SparkSchemaAdapterFactory::new(
-                SparkParquetOptions::new(EvalMode::Ansi, "", false),
-                None,
-            ),
-        ))?;
-
-        let object_store_url = ObjectStoreUrl::local_filesystem();
-
-        // Define schema Comet reads with
         let required_schema = Schema::new(Fields::from(vec![Field::new(
             "c0",
             DataType::Struct(Fields::from(vec![
@@ -3383,47 +3301,23 @@ mod tests {
             ])),
             true,
         )]));
-
-        let file_scan_config = FileScanConfigBuilder::new(
-            object_store_url.clone(),
-            required_schema.into(),
-            Arc::clone(&source),
-        )
-        .with_file_groups(file_groups.clone())
-        .build();
-
-        // Run native read
-        let scan = 
Arc::new(DataSourceExec::new(Arc::new(file_scan_config.clone())));
-        let stream = scan.execute(0, session_ctx.task_ctx())?;
-        let result: Vec<_> = stream.collect().await;
-
-        let actual = result.first().unwrap().as_ref().unwrap();
-
+        let actual = make_parquet_data(test_data, required_schema).await?;
         let expected = ["+----+", "| c0 |", "+----+", "|    |", "+----+"];
-        assert_batches_eq!(expected, &[actual.clone()]);
+        assert_batches_eq!(expected, &[actual]);
+        Ok(())
+    }
 
-        // Define schema Comet reads with
+    // Read struct using custom schema to read just a single field from the 
struct
+    #[tokio::test]
+    async fn test_nested_types_extract_missing_struct_names_single_field(
+    ) -> Result<(), DataFusionError> {
+        let test_data = "select named_struct('a', 1, 'b', 'abc') c0";
         let required_schema = Schema::new(Fields::from(vec![Field::new(
             "c0",
             DataType::Struct(Fields::from(vec![Field::new("a", 
DataType::Int64, true)])),
             true,
         )]));
-
-        let file_scan_config = FileScanConfigBuilder::new(
-            object_store_url.clone(),
-            required_schema.into(),
-            Arc::clone(&source),
-        )
-        .with_file_groups(file_groups.clone())
-        .build();
-
-        // Run native read
-        let scan = 
Arc::new(DataSourceExec::new(Arc::new(file_scan_config.clone())));
-        let stream = scan.execute(0, session_ctx.task_ctx())?;
-        let result: Vec<_> = stream.collect().await;
-
-        let actual = result.first().unwrap().as_ref().unwrap();
-
+        let actual = make_parquet_data(test_data, required_schema).await?;
         let expected = [
             "+--------+",
             "| c0     |",
@@ -3431,9 +3325,15 @@ mod tests {
             "| {a: 1} |",
             "+--------+",
         ];
-        assert_batches_eq!(expected, &[actual.clone()]);
+        assert_batches_eq!(expected, &[actual]);
+        Ok(())
+    }
 
-        // Define schema Comet reads with
+    // Read struct using custom schema to handle a missing field
+    #[tokio::test]
+    async fn test_nested_types_extract_missing_struct_names_missing_field(
+    ) -> Result<(), DataFusionError> {
+        let test_data = "select named_struct('a', 1, 'b', 'abc') c0";
         let required_schema = Schema::new(Fields::from(vec![Field::new(
             "c0",
             DataType::Struct(Fields::from(vec![
@@ -3442,22 +3342,7 @@ mod tests {
             ])),
             true,
         )]));
-
-        let file_scan_config = FileScanConfigBuilder::new(
-            object_store_url.clone(),
-            required_schema.into(),
-            Arc::clone(&source),
-        )
-        .with_file_groups(file_groups.clone())
-        .build();
-
-        // Run native read
-        let scan = 
Arc::new(DataSourceExec::new(Arc::new(file_scan_config.clone())));
-        let stream = scan.execute(0, session_ctx.task_ctx())?;
-        let result: Vec<_> = stream.collect().await;
-
-        let actual = result.first().unwrap().as_ref().unwrap();
-
+        let actual = make_parquet_data(test_data, required_schema).await?;
         let expected = [
             "+-------------+",
             "| c0          |",
@@ -3465,8 +3350,7 @@ mod tests {
             "| {a: 1, x: } |",
             "+-------------+",
         ];
-        assert_batches_eq!(expected, &[actual.clone()]);
-
+        assert_batches_eq!(expected, &[actual]);
         Ok(())
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to