thinkharderdev commented on a change in pull request #1622: URL: https://github.com/apache/arrow-datafusion/pull/1622#discussion_r790151401
########## File path: datafusion/src/physical_plan/file_format/parquet.rs ########## @@ -457,22 +518,313 @@ fn read_partition( #[cfg(test)] mod tests { - use crate::datasource::{ - file_format::{parquet::ParquetFormat, FileFormat}, - object_store::local::{ - local_object_reader_stream, local_unpartitioned_file, LocalFileSystem, + use crate::{ + assert_batches_sorted_eq, + datasource::{ + file_format::{parquet::ParquetFormat, FileFormat}, + object_store::local::{ + local_object_reader_stream, local_unpartitioned_file, LocalFileSystem, + }, }, + physical_plan::collect, }; use super::*; - use arrow::datatypes::{DataType, Field}; + use arrow::{ + array::{Int64Array, Int8Array, StringArray}, + datatypes::{DataType, Field}, + }; use futures::StreamExt; use parquet::{ + arrow::ArrowWriter, basic::Type as PhysicalType, - file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics}, + file::{ + metadata::RowGroupMetaData, properties::WriterProperties, + statistics::Statistics as ParquetStatistics, + }, schema::types::SchemaDescPtr, }; + /// writes each RecordBatch as an individual parquet file and then + /// reads it back in to the named location. + async fn round_trip_to_parquet( + batches: Vec<RecordBatch>, + projection: Option<Vec<usize>>, + schema: Option<SchemaRef>, + ) -> Vec<RecordBatch> { + // When vec is dropped, temp files are deleted + let files: Vec<_> = batches + .into_iter() + .map(|batch| { + let output = tempfile::NamedTempFile::new().expect("creating temp file"); + + let props = WriterProperties::builder().build(); + let file: std::fs::File = (*output.as_file()) + .try_clone() + .expect("cloning file descriptor"); + let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props)) + .expect("creating writer"); + + writer.write(&batch).expect("Writing batch"); + writer.close().unwrap(); + output + }) + .collect(); + + let file_names: Vec<_> = files + .iter() + .map(|t| t.path().to_string_lossy().to_string()) + .collect(); + + // Now, read the files back in + let file_groups: Vec<_> = file_names + .iter() + .map(|name| local_unpartitioned_file(name.clone())) + .collect(); + + // Infer the schema (if not provided) + let file_schema = match schema { + Some(provided_schema) => provided_schema, + None => ParquetFormat::default() + .infer_schema(local_object_reader_stream(file_names)) + .await + .expect("inferring schema"), + }; + + // prepare the scan + let parquet_exec = ParquetExec::new( + FileScanConfig { + object_store: Arc::new(LocalFileSystem {}), + file_groups: vec![file_groups], + file_schema, + statistics: Statistics::default(), + projection, + limit: None, + table_partition_cols: vec![], + }, + None, + ); + + let runtime = Arc::new(RuntimeEnv::default()); + collect(Arc::new(parquet_exec), runtime) + .await + .expect("reading parquet data") + } + + // Add a new column with the specified field name to the RecordBatch + fn add_to_batch( + batch: &RecordBatch, + field_name: &str, + array: ArrayRef, + ) -> RecordBatch { + let mut fields = batch.schema().fields().clone(); + fields.push(Field::new(field_name, array.data_type().clone(), true)); + let schema = Arc::new(Schema::new(fields)); + + let mut columns = batch.columns().to_vec(); + columns.push(array); + RecordBatch::try_new(schema, columns).expect("error; creating record batch") + } + + fn create_batch(columns: Vec<(&str, ArrayRef)>) -> RecordBatch { + columns.into_iter().fold( + RecordBatch::new_empty(Arc::new(Schema::new(vec![]))), + |batch, (field_name, arr)| add_to_batch(&batch, field_name, arr.clone()), + ) + } + + #[tokio::test] + async fn evolved_schema() { + let c1: ArrayRef = + Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); + // batch1: c1(string) + let batch1 = add_to_batch( + &RecordBatch::new_empty(Arc::new(Schema::new(vec![]))), + "c1", + c1, + ); + + // batch2: c1(string) and c2(int64) + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + let batch2 = add_to_batch(&batch1, "c2", c2); + + // batch3: c1(string) and c3(int8) + let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None])); + let batch3 = add_to_batch(&batch1, "c3", c3); + + // read/write them files: + let read = round_trip_to_parquet(vec![batch1, batch2, batch3], None, None).await; + let expected = vec![ + "+-----+----+----+", + "| c1 | c2 | c3 |", + "+-----+----+----+", + "| | | |", + "| | | 20 |", + "| | 2 | |", + "| Foo | | |", + "| Foo | | 10 |", + "| Foo | 1 | |", + "| bar | | |", + "| bar | | |", + "| bar | | |", + "+-----+----+----+", + ]; + assert_batches_sorted_eq!(expected, &read); + } + + #[tokio::test] + async fn evolved_schema_inconsistent_order() { + let c1: ArrayRef = + Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); + + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + + let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None])); + + // batch1: c1(string), c2(int64), c3(int8) + let batch1 = create_batch(vec![ + ("c1", c1.clone()), + ("c2", c2.clone()), + ("c3", c3.clone()), + ]); + + // batch2: c3(int8), c2(int64), c1(string) + let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1)]); + + // read/write them files: + let read = round_trip_to_parquet(vec![batch1, batch2], None, None).await; + let expected = vec![ + "+-----+----+----+", + "| c1 | c2 | c3 |", + "+-----+----+----+", + "| Foo | 1 | 10 |", + "| | 2 | 20 |", + "| bar | | |", + "| Foo | 1 | 10 |", + "| | 2 | 20 |", + "| bar | | |", + "+-----+----+----+", + ]; + assert_batches_sorted_eq!(expected, &read); + } + + #[tokio::test] + async fn evolved_schema_intersection() { + let c1: ArrayRef = + Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); + + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + + let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None])); + + // batch1: c1(string), c2(int64), c3(int8) + let batch1 = create_batch(vec![("c1", c1), ("c3", c3.clone())]); + + // batch2: c3(int8), c2(int64), c1(string) + let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]); + + // read/write them files: + let read = round_trip_to_parquet(vec![batch1, batch2], None, None).await; + let expected = vec![ + "+-----+----+----+", + "| c1 | c3 | c2 |", + "+-----+----+----+", + "| Foo | 10 | |", + "| | 20 | |", + "| bar | | |", + "| | 10 | 1 |", + "| | 20 | 2 |", + "| | | |", + "+-----+----+----+", + ]; + assert_batches_sorted_eq!(expected, &read); + } + + #[tokio::test] + async fn evolved_schema_projection() { + let c1: ArrayRef = + Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); + + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + + let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None])); + + let c4: ArrayRef = + Arc::new(StringArray::from(vec![Some("baz"), Some("boo"), None])); + + // batch1: c1(string), c2(int64), c3(int8) + let batch1 = create_batch(vec![ + ("c1", c1.clone()), + ("c2", c2.clone()), + ("c3", c3.clone()), + ]); + + // batch2: c3(int8), c2(int64), c1(string), c4(string) + let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1), ("c4", c4)]); + + // read/write them files: + let read = + round_trip_to_parquet(vec![batch1, batch2], Some(vec![0, 3]), None).await; + let expected = vec![ + "+-----+-----+", + "| c1 | c4 |", + "+-----+-----+", + "| Foo | baz |", + "| | boo |", + "| bar | |", + "| Foo | |", + "| | |", + "| bar | |", + "+-----+-----+", + ]; + assert_batches_sorted_eq!(expected, &read); + } + + #[tokio::test] + async fn evolved_schema_incompatible_types() { Review comment: That is what I tried to do originally but the issue is that the panic is on the reader thread. In effect it basically just causes the read on that partition to get dropped. We could test for a panic by calling `read_partition` directly from the test if you'd rather do it that way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org