alamb commented on code in PR #20840:
URL: https://github.com/apache/datafusion/pull/20840#discussion_r2988000406


##########
datafusion/core/tests/parquet/expr_adapter.rs:
##########
@@ -54,6 +56,399 @@ async fn write_parquet(batch: RecordBatch, store: Arc<dyn 
ObjectStore>, path: &s
     store.put(&Path::from(path), data.into()).await.unwrap();
 }
 
+#[derive(Debug, Clone, Copy)]
+enum NestedListKind {
+    List,
+    LargeList,
+}
+
+impl NestedListKind {
+    fn field_data_type(self, item_field: Arc<Field>) -> DataType {
+        match self {
+            Self::List => DataType::List(item_field),
+            Self::LargeList => DataType::LargeList(item_field),
+        }
+    }
+
+    fn array(
+        self,
+        item_field: Arc<Field>,
+        lengths: Vec<usize>,
+        values: ArrayRef,
+    ) -> ArrayRef {
+        match self {
+            Self::List => Arc::new(ListArray::new(
+                item_field,
+                OffsetBuffer::<i32>::from_lengths(lengths),
+                values,
+                None,
+            )),
+            Self::LargeList => Arc::new(LargeListArray::new(
+                item_field,
+                OffsetBuffer::<i64>::from_lengths(lengths),
+                values,
+                None,
+            )),
+        }
+    }
+
+    fn name(self) -> &'static str {
+        match self {
+            Self::List => "list",
+            Self::LargeList => "large_list",
+        }
+    }
+}
+
+#[derive(Debug)]
+struct MessageValue<'a> {
+    id: i32,
+    name: &'a str,
+    chain: Option<&'a str>,
+    ignored: Option<i32>,
+}
+
+fn message_fields(
+    chain_type: DataType,
+    chain_nullable: bool,
+    include_chain: bool,
+    include_ignored: bool,
+) -> Fields {
+    let mut fields = vec![
+        Arc::new(Field::new("id", DataType::Int32, false)),
+        Arc::new(Field::new("name", DataType::Utf8, true)),
+    ];
+    if include_chain {
+        fields.push(Arc::new(Field::new("chain", chain_type, chain_nullable)));
+    }
+    if include_ignored {
+        fields.push(Arc::new(Field::new("ignored", DataType::Int32, true)));
+    }
+    fields.into()
+}
+
+// Helper to construct the target message schema for struct evolution tests.
+// The schema always has id (Int64), name (Utf8), and chain with parameterized 
type.
+fn target_message_fields(chain_type: DataType, chain_nullable: bool) -> Fields 
{
+    vec![
+        Arc::new(Field::new("id", DataType::Int64, false)),
+        Arc::new(Field::new("name", DataType::Utf8, true)),
+        Arc::new(Field::new("chain", chain_type, chain_nullable)),
+    ]
+    .into()
+}
+
+// Helper to build message columns in canonical order (id, name, chain, 
ignored)
+// based on which optional fields are present in the schema.
+fn build_message_columns(
+    id_array: &ArrayRef,
+    name_array: &ArrayRef,
+    chain_vec: &[Option<&str>],
+    ignored_array: &ArrayRef,
+    fields: &Fields,
+) -> Vec<ArrayRef> {
+    let mut columns = vec![Arc::clone(id_array), Arc::clone(name_array)];
+
+    for field in fields.iter().skip(2) {
+        match field.name().as_str() {
+            "chain" => {
+                let chain_array = match field.data_type() {
+                    DataType::Utf8 => {
+                        Arc::new(StringArray::from(chain_vec.to_vec())) as 
ArrayRef
+                    }
+                    DataType::Struct(chain_fields) => {
+                        let chain_struct = StructArray::new(
+                            chain_fields.clone(),
+                            
vec![Arc::new(StringArray::from(chain_vec.to_vec()))
+                                as ArrayRef],
+                            None,
+                        );
+                        Arc::new(chain_struct) as ArrayRef
+                    }
+                    other => panic!("unexpected chain field type: {other:?}"),
+                };
+                columns.push(chain_array);
+            }
+            "ignored" => columns.push(Arc::clone(ignored_array)),
+            _ => {}
+        }
+    }
+    columns
+}
+
+fn nested_messages_batch(
+    kind: NestedListKind,
+    row_id: i32,
+    messages: &[MessageValue<'_>],
+    fields: &Fields,
+) -> RecordBatch {
+    let item_field = Arc::new(Field::new("item", 
DataType::Struct(fields.clone()), true));
+
+    let (ids_vec, names_vec, chain_vec, ignored_vec) = messages.iter().fold(
+        (
+            Vec::with_capacity(messages.len()),
+            Vec::with_capacity(messages.len()),
+            Vec::with_capacity(messages.len()),
+            Vec::with_capacity(messages.len()),
+        ),
+        |(mut ids, mut names, mut chains, mut ignoreds), msg| {
+            ids.push(msg.id);
+            names.push(Some(msg.name));
+            chains.push(msg.chain);
+            ignoreds.push(msg.ignored);
+            (ids, names, chains, ignoreds)
+        },
+    );
+
+    // Build all arrays once
+    let id_array = Arc::new(Int32Array::from(ids_vec)) as ArrayRef;
+    let name_array = Arc::new(StringArray::from(names_vec)) as ArrayRef;
+    let ignored_array = Arc::new(Int32Array::from(ignored_vec)) as ArrayRef;
+
+    // Build columns in canonical order (id, name, chain, ignored) based on 
field schema
+    let columns =
+        build_message_columns(&id_array, &name_array, &chain_vec, 
&ignored_array, fields);
+
+    let struct_array = StructArray::new(fields.clone(), columns, None);
+
+    // Compute the message data type first, then move item_field into 
kind.array()
+    let message_data_type = kind.field_data_type(item_field.clone());
+    let messages_array =
+        kind.array(item_field, vec![messages.len()], Arc::new(struct_array));
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("row_id", DataType::Int32, false),
+        Field::new("messages", message_data_type, true),
+    ]));
+
+    RecordBatch::try_new(
+        schema,
+        vec![
+            Arc::new(Int32Array::from(vec![row_id])) as ArrayRef,
+            messages_array,
+        ],
+    )
+    .unwrap()
+}
+
+async fn register_memory_listing_table(
+    ctx: &SessionContext,
+    store: Arc<dyn ObjectStore>,
+    base_path: &str,
+    table_schema: SchemaRef,
+) {
+    let store_url = ObjectStoreUrl::parse("memory://").unwrap();
+    ctx.register_object_store(store_url.as_ref(), Arc::clone(&store));
+
+    let listing_table_config =
+        ListingTableConfig::new(ListingTableUrl::parse(base_path).unwrap())
+            .infer_options(&ctx.state())
+            .await
+            .unwrap()
+            .with_schema(table_schema)
+            
.with_expr_adapter_factory(Arc::new(DefaultPhysicalExprAdapterFactory));
+
+    let table = ListingTable::try_new(listing_table_config).unwrap();
+    ctx.register_table("t", Arc::new(table)).unwrap();
+}
+
+fn test_context() -> SessionContext {
+    let mut cfg = SessionConfig::new()
+        .with_collect_statistics(false)
+        .with_parquet_pruning(false)
+        .with_parquet_page_index_pruning(false);
+    cfg.options_mut().execution.parquet.pushdown_filters = true;
+    SessionContext::new_with_config(cfg)
+}
+
+fn nested_list_table_schema(
+    kind: NestedListKind,
+    target_message_fields: Fields,
+) -> SchemaRef {
+    let target_item = Arc::new(Field::new(
+        "item",
+        DataType::Struct(target_message_fields),
+        true,
+    ));
+    Arc::new(Schema::new(vec![
+        Field::new("row_id", DataType::Int32, false),
+        Field::new("messages", kind.field_data_type(target_item), true),
+    ]))
+}
+
+// Helper to extract message values from a nested list column.
+// Returns the values at indices 0 and 1 from either a ListArray or 
LargeListArray.
+fn extract_nested_list_values(
+    kind: NestedListKind,
+    column: &ArrayRef,
+) -> (ArrayRef, ArrayRef) {
+    match kind {
+        NestedListKind::List => {
+            let list = column
+                .as_any()
+                .downcast_ref::<ListArray>()
+                .expect("messages should be a ListArray");
+            (list.value(0), list.value(1))
+        }
+        NestedListKind::LargeList => {
+            let list = column
+                .as_any()
+                .downcast_ref::<LargeListArray>()
+                .expect("messages should be a LargeListArray");
+            (list.value(0), list.value(1))
+        }
+    }
+}
+
+// Helper to set up a nested list test fixture.
+// Creates an in-memory store, writes the provided batches to parquet files,
+// creates a SessionContext, and registers the resulting table.
+// Returns the prepared context ready for queries.
+async fn setup_nested_list_test(
+    kind: NestedListKind,
+    prefix_base: &str,
+    batches: Vec<(String, RecordBatch)>,
+    table_schema: SchemaRef,
+) -> SessionContext {
+    let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+    let prefix = format!("{}_{}", kind.name(), prefix_base);
+
+    for (filename, batch) in batches {
+        write_parquet(batch, Arc::clone(&store), 
&format!("{prefix}/{filename}")).await;
+    }
+
+    let ctx = test_context();
+    register_memory_listing_table(
+        &ctx,
+        Arc::clone(&store),
+        &format!("memory:///{prefix}/"),
+        table_schema,
+    )
+    .await;
+
+    ctx
+}
+
+async fn assert_nested_list_struct_schema_evolution(kind: NestedListKind) -> 
Result<()> {
+    let old_batch = nested_messages_batch(
+        kind,
+        1,
+        &[
+            MessageValue {

Review Comment:
   I don't understand the reason to create these MessageValue / nested batches 
in the test setup -- can't we just make the batches directly? If we need 
another abstraction, at least perhaps we can add comments explaining what is 
going in



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to