alamb commented on code in PR #16835:
URL: https://github.com/apache/datafusion/pull/16835#discussion_r2231878327


##########
datafusion/core/tests/parquet/schema_adapter.rs:
##########
@@ -370,3 +378,321 @@ async fn 
test_custom_schema_adapter_and_custom_expression_adapter() {
     ];
     assert_batches_eq!(expected, &batches);
 }
+
+// ----------------------------------------------------------------------
+// Tests migrated from schema_adaptation/schema_adapter_integration_tests.rs
+// ----------------------------------------------------------------------
+
+/// A schema adapter factory that transforms column names to uppercase
+#[derive(Debug, PartialEq)]
+struct UppercaseAdapterFactory {}
+
+impl SchemaAdapterFactory for UppercaseAdapterFactory {

Review Comment:
   Moving the tests here sort of implies they are only related to parquet -- 
don't we apply schema adapter to other formats too?
   
   ~However, since all the tests use parquet this seems like a good place to 
put them~
   
   Update: they don't all use parquet



##########
datafusion/core/tests/parquet/schema_adapter.rs:
##########
@@ -370,3 +378,321 @@ async fn 
test_custom_schema_adapter_and_custom_expression_adapter() {
     ];
     assert_batches_eq!(expected, &batches);
 }
+
+// ----------------------------------------------------------------------
+// Tests migrated from schema_adaptation/schema_adapter_integration_tests.rs
+// ----------------------------------------------------------------------
+
+/// A schema adapter factory that transforms column names to uppercase
+#[derive(Debug, PartialEq)]
+struct UppercaseAdapterFactory {}
+
+impl SchemaAdapterFactory for UppercaseAdapterFactory {
+    fn create(
+        &self,
+        projected_table_schema: SchemaRef,
+        _table_schema: SchemaRef,
+    ) -> Box<dyn SchemaAdapter> {
+        Box::new(UppercaseAdapter {
+            table_schema: projected_table_schema,
+        })
+    }
+}
+
+/// Schema adapter that transforms column names to uppercase
+#[derive(Debug)]
+struct UppercaseAdapter {
+    table_schema: SchemaRef,
+}
+
+impl SchemaAdapter for UppercaseAdapter {
+    fn map_column_index(&self, index: usize, file_schema: &Schema) -> 
Option<usize> {
+        let field = self.table_schema.field(index);
+        let uppercase_name = field.name().to_uppercase();
+        file_schema
+            .fields()
+            .iter()
+            .position(|f| f.name().to_uppercase() == uppercase_name)
+    }
+
+    fn map_schema(
+        &self,
+        file_schema: &Schema,
+    ) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
+        let mut projection = Vec::new();
+
+        // Map each field in the table schema to the corresponding field in 
the file schema
+        for table_field in self.table_schema.fields() {
+            let uppercase_name = table_field.name().to_uppercase();
+            if let Some(pos) = file_schema
+                .fields()
+                .iter()
+                .position(|f| f.name().to_uppercase() == uppercase_name)
+            {
+                projection.push(pos);
+            }
+        }
+
+        let mapper = UppercaseSchemaMapper {
+            output_schema: self.output_schema(),
+            projection: projection.clone(),
+        };
+
+        Ok((Arc::new(mapper), projection))
+    }
+}
+
+impl UppercaseAdapter {
+    fn output_schema(&self) -> SchemaRef {
+        let fields: Vec<Field> = self
+            .table_schema
+            .fields()
+            .iter()
+            .map(|f| {
+                Field::new(
+                    f.name().to_uppercase().as_str(),
+                    f.data_type().clone(),
+                    f.is_nullable(),
+                )
+            })
+            .collect();
+
+        Arc::new(Schema::new(fields))
+    }
+}
+
+#[derive(Debug)]
+struct UppercaseSchemaMapper {
+    output_schema: SchemaRef,
+    projection: Vec<usize>,
+}
+
+impl SchemaMapper for UppercaseSchemaMapper {
+    fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
+        let columns = self
+            .projection
+            .iter()
+            .map(|&i| batch.column(i).clone())
+            .collect::<Vec<_>>();
+        Ok(RecordBatch::try_new(self.output_schema.clone(), columns)?)
+    }
+
+    fn map_column_statistics(
+        &self,
+        stats: &[ColumnStatistics],
+    ) -> Result<Vec<ColumnStatistics>> {
+        Ok(self
+            .projection
+            .iter()
+            .map(|&i| stats.get(i).cloned().unwrap_or_default())
+            .collect())
+    }
+}
+
+#[cfg(feature = "parquet")]
+#[tokio::test]
+async fn test_parquet_integration_with_schema_adapter() -> Result<()> {
+    // Create test data
+    let batch = RecordBatch::try_new(
+        Arc::new(Schema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("name", DataType::Utf8, true),
+        ])),
+        vec![
+            Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3])),
+            Arc::new(arrow::array::StringArray::from(vec!["a", "b", "c"])),
+        ],
+    )?;
+
+    let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+    let store_url = ObjectStoreUrl::parse("memory://").unwrap();
+    let path = "test.parquet";
+    write_parquet(batch.clone(), store.clone(), path).await;
+
+    // Get the actual file size from the object store
+    let object_meta = store.head(&Path::from(path)).await?;
+    let file_size = object_meta.size;
+
+    // Create a session context and register the object store
+    let ctx = SessionContext::new();
+    ctx.register_object_store(store_url.as_ref(), Arc::clone(&store));
+
+    // Create a ParquetSource with the adapter factory
+    let file_source = ParquetSource::default()
+        .with_schema_adapter_factory(Arc::new(UppercaseAdapterFactory {}))?;
+
+    // Create a table schema with uppercase column names
+    let table_schema = Arc::new(Schema::new(vec![
+        Field::new("ID", DataType::Int32, false),
+        Field::new("NAME", DataType::Utf8, true),
+    ]));
+
+    let config = FileScanConfigBuilder::new(store_url, table_schema.clone(), 
file_source)
+        .with_file(PartitionedFile::new(path, file_size))
+        .build();
+
+    // Create a data source executor
+    let exec = DataSourceExec::from_data_source(config);
+
+    // Collect results
+    let task_ctx = ctx.task_ctx();
+    let stream = exec.execute(0, task_ctx)?;
+    let batches = datafusion::physical_plan::common::collect(stream).await?;
+
+    // There should be one batch
+    assert_eq!(batches.len(), 1);
+
+    // Verify the schema has the uppercase column names
+    let result_schema = batches[0].schema();
+    assert_eq!(result_schema.field(0).name(), "ID");
+    assert_eq!(result_schema.field(1).name(), "NAME");
+
+    Ok(())
+}
+
+#[cfg(feature = "parquet")]
+#[tokio::test]
+async fn test_parquet_integration_with_schema_adapter_and_expression_rewriter(
+) -> Result<()> {
+    // Create test data
+    let batch = RecordBatch::try_new(
+        Arc::new(Schema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("name", DataType::Utf8, true),
+        ])),
+        vec![
+            Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3])),
+            Arc::new(arrow::array::StringArray::from(vec!["a", "b", "c"])),
+        ],
+    )?;
+
+    let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+    let store_url = ObjectStoreUrl::parse("memory://").unwrap();
+    let path = "test.parquet";
+    write_parquet(batch.clone(), store.clone(), path).await;
+
+    // Get the actual file size from the object store
+    let object_meta = store.head(&Path::from(path)).await?;
+    let file_size = object_meta.size;
+
+    // Create a session context and register the object store
+    let ctx = SessionContext::new();
+    ctx.register_object_store(store_url.as_ref(), Arc::clone(&store));
+
+    // Create a ParquetSource with the adapter factory
+    let file_source = ParquetSource::default()
+        .with_schema_adapter_factory(Arc::new(UppercaseAdapterFactory {}))?;
+
+    let config = FileScanConfigBuilder::new(store_url, batch.schema(), 
file_source)
+        .with_file(PartitionedFile::new(path, file_size))
+        .build();
+
+    // Create a data source executor
+    let exec = DataSourceExec::from_data_source(config);
+
+    // Collect results
+    let task_ctx = ctx.task_ctx();
+    let stream = exec.execute(0, task_ctx)?;
+    let batches = datafusion::physical_plan::common::collect(stream).await?;
+
+    // There should be one batch
+    assert_eq!(batches.len(), 1);
+
+    // Verify the schema has the original column names (schema adapter not 
applied in DataSourceExec)
+    let result_schema = batches[0].schema();
+    assert_eq!(result_schema.field(0).name(), "id");
+    assert_eq!(result_schema.field(1).name(), "name");
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn test_multi_source_schema_adapter_reuse() -> Result<()> {

Review Comment:
   actually, I missed this one before -- given this is testing formats other 
than parquet, I think we should move it back into core_integration. 
   
   Here is a suggestion how: 
https://github.com/apache/datafusion/issues/16801#issuecomment-3120115929



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to