kosiew commented on PR #15295:
URL: https://github.com/apache/datafusion/pull/15295#issuecomment-2750776919

   hi @TheBuilderJR ,
   
   
   I looked at #15259 and wanted to try incorporating its  `struct_coercion`. 
But the amended `struct_coercion` caused `sqllogictests` to fail. So I did not 
proceed further.
   
   Here's the script to run the test case in #14757.
   The main steps are:
   1. specify the schema_adapter (this is a new NestedSchemaAdapter)
   2. plug it into ListingTableConfig
   
   ListingTable will take care of mapping file1, file2's schemas to schema2.
   
   
   ```rust
   use datafusion::arrow::array::{
       Array, Float64Array, StringArray, StructArray, TimestampMillisecondArray,
   };
   use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
   use datafusion::arrow::record_batch::RecordBatch;
   use datafusion::dataframe::DataFrameWriteOptions;
   use datafusion::datasource::file_format::parquet::ParquetFormat;
   use datafusion::datasource::listing::{
       ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
   };
   use 
datafusion::datasource::nested_schema_adapter::NestedStructSchemaAdapterFactory;
   use datafusion::datasource::schema_adapter::SchemaAdapterFactory;
   use datafusion::prelude::*;
   use std::error::Error;
   use std::fs;
   use std::sync::Arc;
   
   async fn test_datafusion_schema_evolution_with_compaction() -> Result<(), 
Box<dyn Error>>
   {
       let ctx = SessionContext::new();
   
       let schema1 = create_schema1();
       let schema2 = create_schema2();
   
       let batch1 = create_batch1(&schema1)?;
       let adapter_factory: Arc<dyn SchemaAdapterFactory> =
           Arc::new(NestedStructSchemaAdapterFactory {});
   
       let path1 = "test_data1.parquet";
       let _ = fs::remove_file(path1);
   
       let df1 = ctx.read_batch(batch1)?;
       df1.write_parquet(
           path1,
           DataFrameWriteOptions::default()
               .with_single_file_output(true)
               .with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
           None,
       )
       .await?;
   
       let batch2 = create_batch2(&schema2)?;
   
       let path2 = "test_data2.parquet";
       let _ = fs::remove_file(path2);
   
       let df2 = ctx.read_batch(batch2)?;
       df2.write_parquet(
           path2,
           DataFrameWriteOptions::default()
               .with_single_file_output(true)
               .with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
           None,
       )
       .await?;
   
       let paths_str = vec![path1.to_string(), path2.to_string()];
   
       let config = ListingTableConfig::new_with_multi_paths(
           paths_str
               .into_iter()
               .map(|p| ListingTableUrl::parse(&p))
               .collect::<Result<Vec<_>, _>>()?,
       )
       .with_schema(schema2.as_ref().clone().into())
       .with_schema_adapter_factory(adapter_factory);
   
       // Merged configuration that both preserves the schema_adapter_factory 
and sets sort order
       let inferred_config = config.clone().infer(&ctx.state()).await?;
       let config = ListingTableConfig {
           schema_adapter_factory: config.schema_adapter_factory.clone(),
           options: Some(ListingOptions {
               file_sort_order: vec![vec![col("timestamp_utc").sort(true, 
true)]],
               ..inferred_config.options.unwrap_or_else(|| {
                   ListingOptions::new(Arc::new(ParquetFormat::default()))
               })
           }),
           ..inferred_config
       };
       let listing_table = ListingTable::try_new(config)?;
   
       ctx.register_table("events", Arc::new(listing_table))?;
   
       let df = ctx
           .sql("SELECT * FROM events ORDER BY timestamp_utc")
           .await?;
       let results = match df.clone().collect().await {
           Ok(res) => res,
           Err(e) => {
               println!("Error collecting results: {}", e);
               remove_data_files(path1, path2);
               return Err(Box::new(e));
           }
       };
   
       assert_eq!(results[0].num_rows(), 2);
   
       let compacted_path = "test_data_compacted.parquet";
       let _ = fs::remove_file(compacted_path);
   
       df.write_parquet(
           compacted_path,
           DataFrameWriteOptions::default()
               .with_single_file_output(true)
               .with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
           None,
       )
       .await?;
   
       let new_ctx = SessionContext::new();
       let config = 
ListingTableConfig::new_with_multi_paths(vec![ListingTableUrl::parse(
           compacted_path,
       )?])
       .with_schema(schema2.as_ref().clone().into())
       .infer(&new_ctx.state())
       .await?;
   
       let listing_table = ListingTable::try_new(config)?;
       new_ctx.register_table("events", Arc::new(listing_table))?;
   
       let df = new_ctx
           .sql("SELECT * FROM events ORDER BY timestamp_utc")
           .await?;
       let compacted_results = df.collect().await;
   
       remove_data_files(path1, path2);
   
       let _ = fs::remove_file(compacted_path);
   
       let compacted_results = compacted_results?;
   
       assert_eq!(compacted_results[0].num_rows(), 2);
       assert_eq!(results, compacted_results);
   
       Ok(())
   }
   
   fn remove_data_files(path1: &str, path2: &str) {
       let _ = fs::remove_file(path1);
       let _ = fs::remove_file(path2);
   }
   
   fn create_schema2() -> Arc<Schema> {
       let schema2 = Arc::new(Schema::new(vec![
           Field::new("component", DataType::Utf8, true),
           Field::new("message", DataType::Utf8, true),
           Field::new("stack", DataType::Utf8, true),
           Field::new("timestamp", DataType::Utf8, true),
           Field::new(
               "timestamp_utc",
               DataType::Timestamp(TimeUnit::Millisecond, None),
               true,
           ),
           Field::new(
               "additionalInfo",
               DataType::Struct(
                   vec![
                       Field::new("location", DataType::Utf8, true),
                       Field::new(
                           "timestamp_utc",
                           DataType::Timestamp(TimeUnit::Millisecond, None),
                           true,
                       ),
                       Field::new(
                           "reason",
                           DataType::Struct(
                               vec![
                                   Field::new("_level", DataType::Float64, 
true),
                                   Field::new(
                                       "details",
                                       DataType::Struct(
                                           vec![
                                               Field::new("rurl", 
DataType::Utf8, true),
                                               Field::new("s", 
DataType::Float64, true),
                                               Field::new("t", DataType::Utf8, 
true),
                                           ]
                                           .into(),
                                       ),
                                       true,
                                   ),
                               ]
                               .into(),
                           ),
                           true,
                       ),
                   ]
                   .into(),
               ),
               true,
           ),
       ]));
       schema2
   }
   
   fn create_batch1(schema1: &Arc<Schema>) -> Result<RecordBatch, Box<dyn 
Error>> {
       let batch1 = RecordBatch::try_new(
           schema1.clone(),
           vec![
               Arc::new(StringArray::from(vec![Some("component1")])),
               Arc::new(StringArray::from(vec![Some("message1")])),
               Arc::new(StringArray::from(vec![Some("stack_trace")])),
               Arc::new(StringArray::from(vec![Some("2025-02-18T00:00:00Z")])),
               
Arc::new(TimestampMillisecondArray::from(vec![Some(1640995200000)])),
               Arc::new(StructArray::from(vec![
                   (
                       Arc::new(Field::new("location", DataType::Utf8, true)),
                       Arc::new(StringArray::from(vec![Some("USA")])) as 
Arc<dyn Array>,
                   ),
                   (
                       Arc::new(Field::new(
                           "timestamp_utc",
                           DataType::Timestamp(TimeUnit::Millisecond, None),
                           true,
                       )),
                       
Arc::new(TimestampMillisecondArray::from(vec![Some(1640995200000)])),
                   ),
               ])),
           ],
       )?;
       Ok(batch1)
   }
   
   fn create_schema1() -> Arc<Schema> {
       let schema1 = Arc::new(Schema::new(vec![
           Field::new("component", DataType::Utf8, true),
           Field::new("message", DataType::Utf8, true),
           Field::new("stack", DataType::Utf8, true),
           Field::new("timestamp", DataType::Utf8, true),
           Field::new(
               "timestamp_utc",
               DataType::Timestamp(TimeUnit::Millisecond, None),
               true,
           ),
           Field::new(
               "additionalInfo",
               DataType::Struct(
                   vec![
                       Field::new("location", DataType::Utf8, true),
                       Field::new(
                           "timestamp_utc",
                           DataType::Timestamp(TimeUnit::Millisecond, None),
                           true,
                       ),
                   ]
                   .into(),
               ),
               true,
           ),
       ]));
       schema1
   }
   
   fn create_batch2(schema2: &Arc<Schema>) -> Result<RecordBatch, Box<dyn 
Error>> {
       let batch2 = RecordBatch::try_new(
           schema2.clone(),
           vec![
               Arc::new(StringArray::from(vec![Some("component1")])),
               Arc::new(StringArray::from(vec![Some("message1")])),
               Arc::new(StringArray::from(vec![Some("stack_trace")])),
               Arc::new(StringArray::from(vec![Some("2025-02-18T00:00:00Z")])),
               
Arc::new(TimestampMillisecondArray::from(vec![Some(1640995200000)])),
               Arc::new(StructArray::from(vec![
                   (
                       Arc::new(Field::new("location", DataType::Utf8, true)),
                       Arc::new(StringArray::from(vec![Some("USA")])) as 
Arc<dyn Array>,
                   ),
                   (
                       Arc::new(Field::new(
                           "timestamp_utc",
                           DataType::Timestamp(TimeUnit::Millisecond, None),
                           true,
                       )),
                       
Arc::new(TimestampMillisecondArray::from(vec![Some(1640995200000)])),
                   ),
                   (
                       Arc::new(Field::new(
                           "reason",
                           DataType::Struct(
                               vec![
                                   Field::new("_level", DataType::Float64, 
true),
                                   Field::new(
                                       "details",
                                       DataType::Struct(
                                           vec![
                                               Field::new("rurl", 
DataType::Utf8, true),
                                               Field::new("s", 
DataType::Float64, true),
                                               Field::new("t", DataType::Utf8, 
true),
                                           ]
                                           .into(),
                                       ),
                                       true,
                                   ),
                               ]
                               .into(),
                           ),
                           true,
                       )),
                       Arc::new(StructArray::from(vec![
                           (
                               Arc::new(Field::new("_level", DataType::Float64, 
true)),
                               Arc::new(Float64Array::from(vec![Some(1.5)]))
                                   as Arc<dyn Array>,
                           ),
                           (
                               Arc::new(Field::new(
                                   "details",
                                   DataType::Struct(
                                       vec![
                                           Field::new("rurl", DataType::Utf8, 
true),
                                           Field::new("s", DataType::Float64, 
true),
                                           Field::new("t", DataType::Utf8, 
true),
                                       ]
                                       .into(),
                                   ),
                                   true,
                               )),
                               Arc::new(StructArray::from(vec![
                                   (
                                       Arc::new(Field::new("rurl", 
DataType::Utf8, true)),
                                       Arc::new(StringArray::from(vec![Some(
                                           "https://example.com";,
                                       )]))
                                           as Arc<dyn Array>,
                                   ),
                                   (
                                       Arc::new(Field::new("s", 
DataType::Float64, true)),
                                       
Arc::new(Float64Array::from(vec![Some(3.14)]))
                                           as Arc<dyn Array>,
                                   ),
                                   (
                                       Arc::new(Field::new("t", DataType::Utf8, 
true)),
                                       
Arc::new(StringArray::from(vec![Some("data")]))
                                           as Arc<dyn Array>,
                                   ),
                               ])),
                           ),
                       ])),
                   ),
               ])),
           ],
       )?;
       Ok(batch2)
   }
   
   fn main() -> Result<(), Box<dyn Error>> {
       // Create a Tokio runtime for running our async function
       let rt = tokio::runtime::Runtime::new()?;
   
       // Run the function in the runtime
       rt.block_on(async { 
test_datafusion_schema_evolution_with_compaction().await })?;
   
       println!("Example completed successfully!");
       Ok(())
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to