alamb commented on code in PR #19345: URL: https://github.com/apache/datafusion/pull/19345#discussion_r2624086682
########## docs/source/library-user-guide/upgrading.md: ########## @@ -490,6 +490,24 @@ If you were using a custom `SchemaAdapterFactory` for schema adaptation (e.g., d See the [default column values example](https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/custom_data_source/default_column_values.rs) for how to implement a custom `PhysicalExprAdapterFactory`. +### `SchemaAdapter` and `SchemaAdapterFactory` completely removed + +The following symbols have been completely removed from DataFusion: + +- `SchemaAdapter` trait +- `SchemaAdapterFactory` trait +- `SchemaMapper` trait +- `SchemaMapping` struct +- `DefaultSchemaAdapterFactory` struct + +These types were previously used to adapt record batch schemas during file reading. Review Comment: This is likely to cause non trivial pain for anyone who uses the SchemaAdapter during upgrade However, I am not sure if leaving the code in but disconnected would be any better. Thus I think we should go with this PR and we can help with some more writeups when we start testing the upgrade with downstream crates (like delta.rs) ########## datafusion/core/tests/schema_adapter/schema_adapter_integration_tests.rs: ########## @@ -1,752 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::sync::Arc; - -use arrow::array::RecordBatch; - -use arrow_schema::{DataType, Field, Schema, SchemaRef}; -use bytes::{BufMut, BytesMut}; -use datafusion::common::Result; -use datafusion::config::{ConfigOptions, TableParquetOptions}; -use datafusion::datasource::listing::PartitionedFile; -#[cfg(feature = "parquet")] -use datafusion::datasource::physical_plan::ParquetSource; -use datafusion::datasource::physical_plan::{ - ArrowSource, CsvSource, FileSource, JsonSource, -}; -use datafusion::logical_expr::{col, lit}; -use datafusion::physical_plan::ExecutionPlan; -use datafusion::prelude::SessionContext; -use datafusion_common::config::CsvOptions; -use datafusion_common::record_batch; -use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::ColumnStatistics; -use datafusion_datasource::file_scan_config::FileScanConfigBuilder; -use datafusion_datasource::schema_adapter::{ - SchemaAdapter, SchemaAdapterFactory, SchemaMapper, -}; - -use datafusion::assert_batches_eq; -use datafusion_datasource::source::DataSourceExec; -use datafusion_datasource::TableSchema; -use datafusion_execution::object_store::ObjectStoreUrl; -use datafusion_expr::Expr; -use datafusion_physical_expr::expressions::Column; -use datafusion_physical_expr::planner::logical2physical; -use datafusion_physical_expr::projection::ProjectionExprs; -use datafusion_physical_expr_adapter::{PhysicalExprAdapter, PhysicalExprAdapterFactory}; -use datafusion_physical_expr_common::physical_expr::PhysicalExpr; -use object_store::{memory::InMemory, path::Path, ObjectStore}; -use parquet::arrow::ArrowWriter; - -async fn write_parquet(batch: RecordBatch, store: Arc<dyn ObjectStore>, path: &str) { - write_batches_to_parquet(&[batch], store, path).await; -} - -/// Write RecordBatches to a Parquet file with each batch in its own row group. -async fn write_batches_to_parquet( - batches: &[RecordBatch], - store: Arc<dyn ObjectStore>, - path: &str, -) -> usize { - let mut out = BytesMut::new().writer(); - { - let mut writer = - ArrowWriter::try_new(&mut out, batches[0].schema(), None).unwrap(); - for batch in batches { - writer.write(batch).unwrap(); - writer.flush().unwrap(); - } - writer.finish().unwrap(); - } - let data = out.into_inner().freeze(); - let file_size = data.len(); - store.put(&Path::from(path), data.into()).await.unwrap(); - file_size -} - -/// A schema adapter factory that transforms column names to uppercase -#[derive(Debug, PartialEq)] -struct UppercaseAdapterFactory {} - -impl SchemaAdapterFactory for UppercaseAdapterFactory { - fn create( - &self, - projected_table_schema: SchemaRef, - _table_schema: SchemaRef, - ) -> Box<dyn SchemaAdapter> { - Box::new(UppercaseAdapter { - table_schema: projected_table_schema, - }) - } -} - -/// Schema adapter that transforms column names to uppercase -#[derive(Debug)] -struct UppercaseAdapter { - table_schema: SchemaRef, -} - -impl SchemaAdapter for UppercaseAdapter { - fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> { - let field = self.table_schema.field(index); - let uppercase_name = field.name().to_uppercase(); - file_schema - .fields() - .iter() - .position(|f| f.name().to_uppercase() == uppercase_name) - } - - fn map_schema( - &self, - file_schema: &Schema, - ) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> { - let mut projection = Vec::new(); - - // Map each field in the table schema to the corresponding field in the file schema - for table_field in self.table_schema.fields() { - let uppercase_name = table_field.name().to_uppercase(); - if let Some(pos) = file_schema - .fields() - .iter() - .position(|f| f.name().to_uppercase() == uppercase_name) - { - projection.push(pos); - } - } - - let mapper = UppercaseSchemaMapper { - output_schema: self.output_schema(), - projection: projection.clone(), - }; - - Ok((Arc::new(mapper), projection)) - } -} - -impl UppercaseAdapter { - fn output_schema(&self) -> SchemaRef { - let fields: Vec<Field> = self - .table_schema - .fields() - .iter() - .map(|f| { - Field::new( - f.name().to_uppercase().as_str(), - f.data_type().clone(), - f.is_nullable(), - ) - }) - .collect(); - - Arc::new(Schema::new(fields)) - } -} - -#[derive(Debug)] -struct UppercaseSchemaMapper { - output_schema: SchemaRef, - projection: Vec<usize>, -} - -impl SchemaMapper for UppercaseSchemaMapper { - fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> { - let columns = self - .projection - .iter() - .map(|&i| batch.column(i).clone()) - .collect::<Vec<_>>(); - Ok(RecordBatch::try_new(self.output_schema.clone(), columns)?) - } - - fn map_column_statistics( - &self, - stats: &[ColumnStatistics], - ) -> Result<Vec<ColumnStatistics>> { - Ok(self - .projection - .iter() - .map(|&i| stats.get(i).cloned().unwrap_or_default()) - .collect()) - } -} - -/// A physical expression adapter factory that maps uppercase column names to lowercase -#[derive(Debug)] -struct UppercasePhysicalExprAdapterFactory; - -impl PhysicalExprAdapterFactory for UppercasePhysicalExprAdapterFactory { - fn create( - &self, - _logical_file_schema: SchemaRef, - physical_file_schema: SchemaRef, - ) -> Arc<dyn PhysicalExprAdapter> { - Arc::new(UppercasePhysicalExprAdapter { - physical_file_schema, - }) - } -} - -#[derive(Debug)] -struct UppercasePhysicalExprAdapter { - physical_file_schema: SchemaRef, -} - -impl PhysicalExprAdapter for UppercasePhysicalExprAdapter { - fn rewrite(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> { - expr.transform(|e| { - if let Some(column) = e.as_any().downcast_ref::<Column>() { - // Map uppercase column name (from logical schema) to lowercase (in physical file) - let lowercase_name = column.name().to_lowercase(); - if let Ok(idx) = self.physical_file_schema.index_of(&lowercase_name) { - return Ok(Transformed::yes( - Arc::new(Column::new(&lowercase_name, idx)) - as Arc<dyn PhysicalExpr>, - )); - } - } - Ok(Transformed::no(e)) - }) - .data() - } -} - -#[derive(Clone)] -struct ParquetTestCase { - table_schema: TableSchema, - batches: Vec<RecordBatch>, - predicate: Option<Expr>, - projection: Option<ProjectionExprs>, - push_down_filters: bool, -} - -impl ParquetTestCase { - fn new(table_schema: TableSchema, batches: Vec<RecordBatch>) -> Self { - Self { - table_schema, - batches, - predicate: None, - projection: None, - push_down_filters: true, - } - } - - fn push_down_filters(mut self, pushdown_filters: bool) -> Self { - self.push_down_filters = pushdown_filters; - self - } - - fn with_predicate(mut self, predicate: Expr) -> Self { - self.predicate = Some(predicate); - self - } - - fn with_projection(mut self, projection: ProjectionExprs) -> Self { - self.projection = Some(projection); - self - } - - async fn execute(self) -> Result<Vec<RecordBatch>> { - let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>; - let store_url = ObjectStoreUrl::parse("memory://").unwrap(); - let path = "test.parquet"; - let file_size = - write_batches_to_parquet(&self.batches, store.clone(), path).await; - - let ctx = SessionContext::new(); - ctx.register_object_store(store_url.as_ref(), Arc::clone(&store)); - - let mut table_options = TableParquetOptions::default(); - // controlled via ConfigOptions flag; ParquetSources ORs them so if either is true then pushdown is enabled - table_options.global.pushdown_filters = false; - let mut file_source = Arc::new( - ParquetSource::new(self.table_schema.table_schema().clone()) - .with_table_parquet_options(table_options), - ) as Arc<dyn FileSource>; - - if let Some(projection) = self.projection { - file_source = file_source.try_pushdown_projection(&projection)?.unwrap(); - } - - if let Some(predicate) = &self.predicate { - let filter_expr = - logical2physical(predicate, self.table_schema.table_schema()); - let mut config = ConfigOptions::default(); - config.execution.parquet.pushdown_filters = self.push_down_filters; - let result = file_source.try_pushdown_filters(vec![filter_expr], &config)?; - file_source = result.updated_node.unwrap(); - } - - let config = FileScanConfigBuilder::new(store_url.clone(), file_source) - .with_file(PartitionedFile::new(path, file_size as u64)) // size 0 for test - .with_expr_adapter(None) - .build(); - - let exec = DataSourceExec::from_data_source(config); - let task_ctx = ctx.task_ctx(); - let stream = exec.execute(0, task_ctx)?; - datafusion::physical_plan::common::collect(stream).await - } -} - -/// Test reading and filtering a Parquet file where the table schema is flipped (c, b, a) vs. the physical file schema (a, b, c) Review Comment: are these scenarios covered elsewhere? I feel like (now) we could write these all as .slt tests Looks like some of it is covered here: https://github.com/apache/datafusion/blob/main/datafusion/sqllogictest/test_files/schema_evolution.slt -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
