alamb commented on a change in pull request #1622: URL: https://github.com/apache/arrow-datafusion/pull/1622#discussion_r790137229
########## File path: datafusion/src/physical_plan/file_format/parquet.rs ########## @@ -457,22 +518,313 @@ fn read_partition( #[cfg(test)] mod tests { - use crate::datasource::{ - file_format::{parquet::ParquetFormat, FileFormat}, - object_store::local::{ - local_object_reader_stream, local_unpartitioned_file, LocalFileSystem, + use crate::{ + assert_batches_sorted_eq, + datasource::{ + file_format::{parquet::ParquetFormat, FileFormat}, + object_store::local::{ + local_object_reader_stream, local_unpartitioned_file, LocalFileSystem, + }, }, + physical_plan::collect, }; use super::*; - use arrow::datatypes::{DataType, Field}; + use arrow::{ + array::{Int64Array, Int8Array, StringArray}, + datatypes::{DataType, Field}, + }; use futures::StreamExt; use parquet::{ + arrow::ArrowWriter, basic::Type as PhysicalType, - file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics}, + file::{ + metadata::RowGroupMetaData, properties::WriterProperties, + statistics::Statistics as ParquetStatistics, + }, schema::types::SchemaDescPtr, }; + /// writes each RecordBatch as an individual parquet file and then + /// reads it back in to the named location. + async fn round_trip_to_parquet( + batches: Vec<RecordBatch>, + projection: Option<Vec<usize>>, + schema: Option<SchemaRef>, + ) -> Vec<RecordBatch> { + // When vec is dropped, temp files are deleted + let files: Vec<_> = batches + .into_iter() + .map(|batch| { + let output = tempfile::NamedTempFile::new().expect("creating temp file"); + + let props = WriterProperties::builder().build(); + let file: std::fs::File = (*output.as_file()) + .try_clone() + .expect("cloning file descriptor"); + let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props)) + .expect("creating writer"); + + writer.write(&batch).expect("Writing batch"); + writer.close().unwrap(); + output + }) + .collect(); + + let file_names: Vec<_> = files + .iter() + .map(|t| t.path().to_string_lossy().to_string()) + .collect(); + + // Now, read the files back in + let file_groups: Vec<_> = file_names + .iter() + .map(|name| local_unpartitioned_file(name.clone())) + .collect(); + + // Infer the schema (if not provided) + let file_schema = match schema { + Some(provided_schema) => provided_schema, + None => ParquetFormat::default() + .infer_schema(local_object_reader_stream(file_names)) + .await + .expect("inferring schema"), + }; + + // prepare the scan + let parquet_exec = ParquetExec::new( + FileScanConfig { + object_store: Arc::new(LocalFileSystem {}), + file_groups: vec![file_groups], + file_schema, + statistics: Statistics::default(), + projection, + limit: None, + table_partition_cols: vec![], + }, + None, + ); + + let runtime = Arc::new(RuntimeEnv::default()); + collect(Arc::new(parquet_exec), runtime) + .await + .expect("reading parquet data") + } + + // Add a new column with the specified field name to the RecordBatch + fn add_to_batch( + batch: &RecordBatch, + field_name: &str, + array: ArrayRef, + ) -> RecordBatch { + let mut fields = batch.schema().fields().clone(); + fields.push(Field::new(field_name, array.data_type().clone(), true)); + let schema = Arc::new(Schema::new(fields)); + + let mut columns = batch.columns().to_vec(); + columns.push(array); + RecordBatch::try_new(schema, columns).expect("error; creating record batch") + } + + fn create_batch(columns: Vec<(&str, ArrayRef)>) -> RecordBatch { + columns.into_iter().fold( + RecordBatch::new_empty(Arc::new(Schema::new(vec![]))), + |batch, (field_name, arr)| add_to_batch(&batch, field_name, arr.clone()), + ) + } + + #[tokio::test] + async fn evolved_schema() { + let c1: ArrayRef = + Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); + // batch1: c1(string) + let batch1 = add_to_batch( + &RecordBatch::new_empty(Arc::new(Schema::new(vec![]))), + "c1", + c1, + ); + + // batch2: c1(string) and c2(int64) + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + let batch2 = add_to_batch(&batch1, "c2", c2); + + // batch3: c1(string) and c3(int8) + let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None])); + let batch3 = add_to_batch(&batch1, "c3", c3); + + // read/write them files: + let read = round_trip_to_parquet(vec![batch1, batch2, batch3], None, None).await; + let expected = vec![ + "+-----+----+----+", + "| c1 | c2 | c3 |", + "+-----+----+----+", + "| | | |", + "| | | 20 |", + "| | 2 | |", + "| Foo | | |", + "| Foo | | 10 |", + "| Foo | 1 | |", + "| bar | | |", + "| bar | | |", + "| bar | | |", + "+-----+----+----+", + ]; + assert_batches_sorted_eq!(expected, &read); + } + + #[tokio::test] + async fn evolved_schema_inconsistent_order() { + let c1: ArrayRef = + Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); + + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + + let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None])); + + // batch1: c1(string), c2(int64), c3(int8) + let batch1 = create_batch(vec![ + ("c1", c1.clone()), + ("c2", c2.clone()), + ("c3", c3.clone()), + ]); + + // batch2: c3(int8), c2(int64), c1(string) + let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1)]); + + // read/write them files: + let read = round_trip_to_parquet(vec![batch1, batch2], None, None).await; + let expected = vec![ + "+-----+----+----+", + "| c1 | c2 | c3 |", + "+-----+----+----+", + "| Foo | 1 | 10 |", + "| | 2 | 20 |", + "| bar | | |", + "| Foo | 1 | 10 |", + "| | 2 | 20 |", + "| bar | | |", + "+-----+----+----+", + ]; + assert_batches_sorted_eq!(expected, &read); + } + + #[tokio::test] + async fn evolved_schema_intersection() { + let c1: ArrayRef = + Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); + + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + + let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None])); + + // batch1: c1(string), c2(int64), c3(int8) + let batch1 = create_batch(vec![("c1", c1), ("c3", c3.clone())]); + + // batch2: c3(int8), c2(int64), c1(string) + let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]); + + // read/write them files: + let read = round_trip_to_parquet(vec![batch1, batch2], None, None).await; + let expected = vec![ + "+-----+----+----+", + "| c1 | c3 | c2 |", + "+-----+----+----+", + "| Foo | 10 | |", + "| | 20 | |", + "| bar | | |", + "| | 10 | 1 |", + "| | 20 | 2 |", + "| | | |", + "+-----+----+----+", + ]; + assert_batches_sorted_eq!(expected, &read); + } + + #[tokio::test] + async fn evolved_schema_projection() { + let c1: ArrayRef = + Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); + + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + + let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None])); + + let c4: ArrayRef = + Arc::new(StringArray::from(vec![Some("baz"), Some("boo"), None])); + + // batch1: c1(string), c2(int64), c3(int8) + let batch1 = create_batch(vec![ + ("c1", c1.clone()), + ("c2", c2.clone()), + ("c3", c3.clone()), + ]); + + // batch2: c3(int8), c2(int64), c1(string), c4(string) + let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1), ("c4", c4)]); + + // read/write them files: + let read = + round_trip_to_parquet(vec![batch1, batch2], Some(vec![0, 3]), None).await; Review comment: using the projection index `3` is a good 👍 (as it requires projecting on the merged schema) ########## File path: datafusion/src/physical_plan/file_format/parquet.rs ########## @@ -385,9 +389,33 @@ fn build_row_group_predicate( } } +// Map projections from the schema which merges all file schemas to projections on a particular +// file +fn map_projections( + merged_schema: &Schema, + file_schema: &Schema, + projections: &[usize], +) -> Result<Vec<usize>> { + let mut mapped: Vec<usize> = vec![]; + for idx in projections { + let field = merged_schema.field(*idx); + if let Ok(mapped_idx) = file_schema.index_of(field.name().as_str()) { + if file_schema.field(mapped_idx).data_type() == field.data_type() { + mapped.push(mapped_idx) + } else { + let msg = format!("Failed to map column projection for field {}. Incompatible data types {:?} and {:?}", field.name(), file_schema.field(mapped_idx).data_type(), field.data_type()); + error!("{}", msg); + return Err(DataFusionError::ParquetError(ParquetError::General(msg))); Review comment: I recommend 1. Using a `info` message (as depending on the usecase this may signal a user error rather than something to investigate) 2. Using a `DataFusionError` rather than one from Parquet (the rationale being that this error comes from DataFusion, and is not related to being able to read the parquet files) ```suggestion let msg = format!("Failed to map column projection for field {}. Incompatible data types {:?} and {:?}", field.name(), file_schema.field(mapped_idx).data_type(), field.data_type()); info!("{}", msg); return Err(DataFusionError::Execution(msg)); ``` ########## File path: datafusion/src/physical_plan/file_format/parquet.rs ########## @@ -457,22 +518,313 @@ fn read_partition( #[cfg(test)] mod tests { - use crate::datasource::{ - file_format::{parquet::ParquetFormat, FileFormat}, - object_store::local::{ - local_object_reader_stream, local_unpartitioned_file, LocalFileSystem, + use crate::{ + assert_batches_sorted_eq, + datasource::{ + file_format::{parquet::ParquetFormat, FileFormat}, + object_store::local::{ + local_object_reader_stream, local_unpartitioned_file, LocalFileSystem, + }, }, + physical_plan::collect, }; use super::*; - use arrow::datatypes::{DataType, Field}; + use arrow::{ + array::{Int64Array, Int8Array, StringArray}, + datatypes::{DataType, Field}, + }; use futures::StreamExt; use parquet::{ + arrow::ArrowWriter, basic::Type as PhysicalType, - file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics}, + file::{ + metadata::RowGroupMetaData, properties::WriterProperties, + statistics::Statistics as ParquetStatistics, + }, schema::types::SchemaDescPtr, }; + /// writes each RecordBatch as an individual parquet file and then + /// reads it back in to the named location. + async fn round_trip_to_parquet( + batches: Vec<RecordBatch>, + projection: Option<Vec<usize>>, + schema: Option<SchemaRef>, + ) -> Vec<RecordBatch> { + // When vec is dropped, temp files are deleted + let files: Vec<_> = batches + .into_iter() + .map(|batch| { + let output = tempfile::NamedTempFile::new().expect("creating temp file"); + + let props = WriterProperties::builder().build(); + let file: std::fs::File = (*output.as_file()) + .try_clone() + .expect("cloning file descriptor"); + let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props)) + .expect("creating writer"); + + writer.write(&batch).expect("Writing batch"); + writer.close().unwrap(); + output + }) + .collect(); + + let file_names: Vec<_> = files + .iter() + .map(|t| t.path().to_string_lossy().to_string()) + .collect(); + + // Now, read the files back in + let file_groups: Vec<_> = file_names + .iter() + .map(|name| local_unpartitioned_file(name.clone())) + .collect(); + + // Infer the schema (if not provided) + let file_schema = match schema { + Some(provided_schema) => provided_schema, + None => ParquetFormat::default() + .infer_schema(local_object_reader_stream(file_names)) + .await + .expect("inferring schema"), + }; + + // prepare the scan + let parquet_exec = ParquetExec::new( + FileScanConfig { + object_store: Arc::new(LocalFileSystem {}), + file_groups: vec![file_groups], + file_schema, + statistics: Statistics::default(), + projection, + limit: None, + table_partition_cols: vec![], + }, + None, + ); + + let runtime = Arc::new(RuntimeEnv::default()); + collect(Arc::new(parquet_exec), runtime) + .await + .expect("reading parquet data") + } + + // Add a new column with the specified field name to the RecordBatch + fn add_to_batch( + batch: &RecordBatch, + field_name: &str, + array: ArrayRef, + ) -> RecordBatch { + let mut fields = batch.schema().fields().clone(); + fields.push(Field::new(field_name, array.data_type().clone(), true)); + let schema = Arc::new(Schema::new(fields)); + + let mut columns = batch.columns().to_vec(); + columns.push(array); + RecordBatch::try_new(schema, columns).expect("error; creating record batch") + } + + fn create_batch(columns: Vec<(&str, ArrayRef)>) -> RecordBatch { + columns.into_iter().fold( + RecordBatch::new_empty(Arc::new(Schema::new(vec![]))), + |batch, (field_name, arr)| add_to_batch(&batch, field_name, arr.clone()), + ) + } + + #[tokio::test] + async fn evolved_schema() { + let c1: ArrayRef = + Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); + // batch1: c1(string) + let batch1 = add_to_batch( + &RecordBatch::new_empty(Arc::new(Schema::new(vec![]))), + "c1", + c1, + ); + + // batch2: c1(string) and c2(int64) + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + let batch2 = add_to_batch(&batch1, "c2", c2); + + // batch3: c1(string) and c3(int8) + let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None])); + let batch3 = add_to_batch(&batch1, "c3", c3); + + // read/write them files: + let read = round_trip_to_parquet(vec![batch1, batch2, batch3], None, None).await; + let expected = vec![ + "+-----+----+----+", + "| c1 | c2 | c3 |", + "+-----+----+----+", + "| | | |", + "| | | 20 |", + "| | 2 | |", + "| Foo | | |", + "| Foo | | 10 |", + "| Foo | 1 | |", + "| bar | | |", + "| bar | | |", + "| bar | | |", + "+-----+----+----+", + ]; + assert_batches_sorted_eq!(expected, &read); + } + + #[tokio::test] + async fn evolved_schema_inconsistent_order() { + let c1: ArrayRef = + Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); + + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + + let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None])); + + // batch1: c1(string), c2(int64), c3(int8) + let batch1 = create_batch(vec![ + ("c1", c1.clone()), + ("c2", c2.clone()), + ("c3", c3.clone()), + ]); + + // batch2: c3(int8), c2(int64), c1(string) + let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1)]); + + // read/write them files: + let read = round_trip_to_parquet(vec![batch1, batch2], None, None).await; + let expected = vec![ + "+-----+----+----+", + "| c1 | c2 | c3 |", + "+-----+----+----+", + "| Foo | 1 | 10 |", + "| | 2 | 20 |", + "| bar | | |", + "| Foo | 1 | 10 |", + "| | 2 | 20 |", + "| bar | | |", + "+-----+----+----+", + ]; + assert_batches_sorted_eq!(expected, &read); + } + + #[tokio::test] + async fn evolved_schema_intersection() { + let c1: ArrayRef = + Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); + + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + + let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None])); + + // batch1: c1(string), c2(int64), c3(int8) + let batch1 = create_batch(vec![("c1", c1), ("c3", c3.clone())]); + + // batch2: c3(int8), c2(int64), c1(string) + let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]); + + // read/write them files: + let read = round_trip_to_parquet(vec![batch1, batch2], None, None).await; + let expected = vec![ + "+-----+----+----+", + "| c1 | c3 | c2 |", + "+-----+----+----+", + "| Foo | 10 | |", + "| | 20 | |", + "| bar | | |", + "| | 10 | 1 |", + "| | 20 | 2 |", + "| | | |", + "+-----+----+----+", + ]; + assert_batches_sorted_eq!(expected, &read); + } + + #[tokio::test] + async fn evolved_schema_projection() { + let c1: ArrayRef = + Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); + + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + + let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None])); + + let c4: ArrayRef = + Arc::new(StringArray::from(vec![Some("baz"), Some("boo"), None])); + + // batch1: c1(string), c2(int64), c3(int8) + let batch1 = create_batch(vec![ + ("c1", c1.clone()), + ("c2", c2.clone()), + ("c3", c3.clone()), + ]); + + // batch2: c3(int8), c2(int64), c1(string), c4(string) + let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1), ("c4", c4)]); + + // read/write them files: + let read = + round_trip_to_parquet(vec![batch1, batch2], Some(vec![0, 3]), None).await; + let expected = vec![ + "+-----+-----+", + "| c1 | c4 |", + "+-----+-----+", + "| Foo | baz |", + "| | boo |", + "| bar | |", + "| Foo | |", + "| | |", + "| bar | |", + "+-----+-----+", + ]; + assert_batches_sorted_eq!(expected, &read); + } + + #[tokio::test] + async fn evolved_schema_incompatible_types() { Review comment: This test name implies it is for incompatible types, but then merges two files with compatible types. Perhaps you could switch one of the column types and then assert a panic like ```suggestion #[should_panic(expected = "incorrect types")] async fn evolved_schema_incompatible_types() { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org