kosiew commented on code in PR #20840:
URL: https://github.com/apache/datafusion/pull/20840#discussion_r2993841346


##########
datafusion/core/tests/parquet/expr_adapter.rs:
##########
@@ -54,6 +56,399 @@ async fn write_parquet(batch: RecordBatch, store: Arc<dyn 
ObjectStore>, path: &s
     store.put(&Path::from(path), data.into()).await.unwrap();
 }
 
+#[derive(Debug, Clone, Copy)]
+enum NestedListKind {
+    List,
+    LargeList,
+}
+
+impl NestedListKind {
+    fn field_data_type(self, item_field: Arc<Field>) -> DataType {
+        match self {
+            Self::List => DataType::List(item_field),
+            Self::LargeList => DataType::LargeList(item_field),
+        }
+    }
+
+    fn array(
+        self,
+        item_field: Arc<Field>,
+        lengths: Vec<usize>,
+        values: ArrayRef,
+    ) -> ArrayRef {
+        match self {
+            Self::List => Arc::new(ListArray::new(
+                item_field,
+                OffsetBuffer::<i32>::from_lengths(lengths),
+                values,
+                None,
+            )),
+            Self::LargeList => Arc::new(LargeListArray::new(
+                item_field,
+                OffsetBuffer::<i64>::from_lengths(lengths),
+                values,
+                None,
+            )),
+        }
+    }
+
+    fn name(self) -> &'static str {
+        match self {
+            Self::List => "list",
+            Self::LargeList => "large_list",
+        }
+    }
+}
+
+#[derive(Debug)]
+struct MessageValue<'a> {
+    id: i32,
+    name: &'a str,
+    chain: Option<&'a str>,
+    ignored: Option<i32>,
+}
+
+fn message_fields(
+    chain_type: DataType,
+    chain_nullable: bool,
+    include_chain: bool,
+    include_ignored: bool,
+) -> Fields {
+    let mut fields = vec![
+        Arc::new(Field::new("id", DataType::Int32, false)),
+        Arc::new(Field::new("name", DataType::Utf8, true)),
+    ];
+    if include_chain {
+        fields.push(Arc::new(Field::new("chain", chain_type, chain_nullable)));
+    }
+    if include_ignored {
+        fields.push(Arc::new(Field::new("ignored", DataType::Int32, true)));
+    }
+    fields.into()
+}
+
+// Helper to construct the target message schema for struct evolution tests.
+// The schema always has id (Int64), name (Utf8), and chain with parameterized 
type.
+fn target_message_fields(chain_type: DataType, chain_nullable: bool) -> Fields 
{
+    vec![
+        Arc::new(Field::new("id", DataType::Int64, false)),
+        Arc::new(Field::new("name", DataType::Utf8, true)),
+        Arc::new(Field::new("chain", chain_type, chain_nullable)),
+    ]
+    .into()
+}
+
+// Helper to build message columns in canonical order (id, name, chain, 
ignored)
+// based on which optional fields are present in the schema.
+fn build_message_columns(
+    id_array: &ArrayRef,
+    name_array: &ArrayRef,
+    chain_vec: &[Option<&str>],
+    ignored_array: &ArrayRef,
+    fields: &Fields,
+) -> Vec<ArrayRef> {
+    let mut columns = vec![Arc::clone(id_array), Arc::clone(name_array)];
+
+    for field in fields.iter().skip(2) {
+        match field.name().as_str() {
+            "chain" => {
+                let chain_array = match field.data_type() {
+                    DataType::Utf8 => {
+                        Arc::new(StringArray::from(chain_vec.to_vec())) as 
ArrayRef
+                    }
+                    DataType::Struct(chain_fields) => {
+                        let chain_struct = StructArray::new(
+                            chain_fields.clone(),
+                            
vec![Arc::new(StringArray::from(chain_vec.to_vec()))
+                                as ArrayRef],
+                            None,
+                        );
+                        Arc::new(chain_struct) as ArrayRef
+                    }
+                    other => panic!("unexpected chain field type: {other:?}"),
+                };
+                columns.push(chain_array);
+            }
+            "ignored" => columns.push(Arc::clone(ignored_array)),
+            _ => {}
+        }
+    }
+    columns
+}
+
+fn nested_messages_batch(
+    kind: NestedListKind,
+    row_id: i32,
+    messages: &[MessageValue<'_>],
+    fields: &Fields,
+) -> RecordBatch {
+    let item_field = Arc::new(Field::new("item", 
DataType::Struct(fields.clone()), true));
+
+    let (ids_vec, names_vec, chain_vec, ignored_vec) = messages.iter().fold(
+        (
+            Vec::with_capacity(messages.len()),
+            Vec::with_capacity(messages.len()),
+            Vec::with_capacity(messages.len()),
+            Vec::with_capacity(messages.len()),
+        ),
+        |(mut ids, mut names, mut chains, mut ignoreds), msg| {
+            ids.push(msg.id);
+            names.push(Some(msg.name));
+            chains.push(msg.chain);
+            ignoreds.push(msg.ignored);
+            (ids, names, chains, ignoreds)
+        },
+    );
+
+    // Build all arrays once
+    let id_array = Arc::new(Int32Array::from(ids_vec)) as ArrayRef;
+    let name_array = Arc::new(StringArray::from(names_vec)) as ArrayRef;
+    let ignored_array = Arc::new(Int32Array::from(ignored_vec)) as ArrayRef;
+
+    // Build columns in canonical order (id, name, chain, ignored) based on 
field schema
+    let columns =
+        build_message_columns(&id_array, &name_array, &chain_vec, 
&ignored_array, fields);
+
+    let struct_array = StructArray::new(fields.clone(), columns, None);
+
+    // Compute the message data type first, then move item_field into 
kind.array()
+    let message_data_type = kind.field_data_type(item_field.clone());
+    let messages_array =
+        kind.array(item_field, vec![messages.len()], Arc::new(struct_array));
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("row_id", DataType::Int32, false),
+        Field::new("messages", message_data_type, true),
+    ]));
+
+    RecordBatch::try_new(
+        schema,
+        vec![
+            Arc::new(Int32Array::from(vec![row_id])) as ArrayRef,
+            messages_array,
+        ],
+    )
+    .unwrap()
+}
+
+async fn register_memory_listing_table(
+    ctx: &SessionContext,
+    store: Arc<dyn ObjectStore>,
+    base_path: &str,
+    table_schema: SchemaRef,
+) {
+    let store_url = ObjectStoreUrl::parse("memory://").unwrap();
+    ctx.register_object_store(store_url.as_ref(), Arc::clone(&store));
+
+    let listing_table_config =
+        ListingTableConfig::new(ListingTableUrl::parse(base_path).unwrap())
+            .infer_options(&ctx.state())
+            .await
+            .unwrap()
+            .with_schema(table_schema)
+            
.with_expr_adapter_factory(Arc::new(DefaultPhysicalExprAdapterFactory));
+
+    let table = ListingTable::try_new(listing_table_config).unwrap();
+    ctx.register_table("t", Arc::new(table)).unwrap();
+}
+
+fn test_context() -> SessionContext {
+    let mut cfg = SessionConfig::new()
+        .with_collect_statistics(false)
+        .with_parquet_pruning(false)
+        .with_parquet_page_index_pruning(false);
+    cfg.options_mut().execution.parquet.pushdown_filters = true;
+    SessionContext::new_with_config(cfg)
+}
+
+fn nested_list_table_schema(
+    kind: NestedListKind,
+    target_message_fields: Fields,
+) -> SchemaRef {
+    let target_item = Arc::new(Field::new(
+        "item",
+        DataType::Struct(target_message_fields),
+        true,
+    ));
+    Arc::new(Schema::new(vec![
+        Field::new("row_id", DataType::Int32, false),
+        Field::new("messages", kind.field_data_type(target_item), true),
+    ]))
+}
+
+// Helper to extract message values from a nested list column.
+// Returns the values at indices 0 and 1 from either a ListArray or 
LargeListArray.
+fn extract_nested_list_values(
+    kind: NestedListKind,
+    column: &ArrayRef,
+) -> (ArrayRef, ArrayRef) {
+    match kind {
+        NestedListKind::List => {
+            let list = column
+                .as_any()
+                .downcast_ref::<ListArray>()
+                .expect("messages should be a ListArray");
+            (list.value(0), list.value(1))
+        }
+        NestedListKind::LargeList => {
+            let list = column
+                .as_any()
+                .downcast_ref::<LargeListArray>()
+                .expect("messages should be a LargeListArray");
+            (list.value(0), list.value(1))
+        }
+    }
+}
+
+// Helper to set up a nested list test fixture.
+// Creates an in-memory store, writes the provided batches to parquet files,
+// creates a SessionContext, and registers the resulting table.
+// Returns the prepared context ready for queries.
+async fn setup_nested_list_test(
+    kind: NestedListKind,
+    prefix_base: &str,
+    batches: Vec<(String, RecordBatch)>,
+    table_schema: SchemaRef,
+) -> SessionContext {
+    let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+    let prefix = format!("{}_{}", kind.name(), prefix_base);
+
+    for (filename, batch) in batches {
+        write_parquet(batch, Arc::clone(&store), 
&format!("{prefix}/{filename}")).await;
+    }
+
+    let ctx = test_context();
+    register_memory_listing_table(
+        &ctx,
+        Arc::clone(&store),
+        &format!("memory:///{prefix}/"),
+        table_schema,
+    )
+    .await;
+
+    ctx
+}
+
+async fn assert_nested_list_struct_schema_evolution(kind: NestedListKind) -> 
Result<()> {
+    let old_batch = nested_messages_batch(
+        kind,
+        1,
+        &[
+            MessageValue {

Review Comment:
   The abstraction was meant to avoid repeating the same nested batch 
construction for both `List` and `LargeList`, and to make it easy to generate 
the “old schema” and “new schema” Parquet files that differ only by nested 
struct fields. 
   
   The goal here was to exercise the full Parquet scan + schema adaptation path 
end-to-end, not just a unit helper. That said, I agree the current helpers 
obscure the intent. I'll add comments that explain the old/new file shapes and 
why we need actual nested Parquet batches.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to