alamb commented on a change in pull request #1622:
URL: https://github.com/apache/arrow-datafusion/pull/1622#discussion_r790137229



##########
File path: datafusion/src/physical_plan/file_format/parquet.rs
##########
@@ -457,22 +518,313 @@ fn read_partition(
 
 #[cfg(test)]
 mod tests {
-    use crate::datasource::{
-        file_format::{parquet::ParquetFormat, FileFormat},
-        object_store::local::{
-            local_object_reader_stream, local_unpartitioned_file, 
LocalFileSystem,
+    use crate::{
+        assert_batches_sorted_eq,
+        datasource::{
+            file_format::{parquet::ParquetFormat, FileFormat},
+            object_store::local::{
+                local_object_reader_stream, local_unpartitioned_file, 
LocalFileSystem,
+            },
         },
+        physical_plan::collect,
     };
 
     use super::*;
-    use arrow::datatypes::{DataType, Field};
+    use arrow::{
+        array::{Int64Array, Int8Array, StringArray},
+        datatypes::{DataType, Field},
+    };
     use futures::StreamExt;
     use parquet::{
+        arrow::ArrowWriter,
         basic::Type as PhysicalType,
-        file::{metadata::RowGroupMetaData, statistics::Statistics as 
ParquetStatistics},
+        file::{
+            metadata::RowGroupMetaData, properties::WriterProperties,
+            statistics::Statistics as ParquetStatistics,
+        },
         schema::types::SchemaDescPtr,
     };
 
+    /// writes each RecordBatch as an individual parquet file and then
+    /// reads it back in to the named location.
+    async fn round_trip_to_parquet(
+        batches: Vec<RecordBatch>,
+        projection: Option<Vec<usize>>,
+        schema: Option<SchemaRef>,
+    ) -> Vec<RecordBatch> {
+        // When vec is dropped, temp files are deleted
+        let files: Vec<_> = batches
+            .into_iter()
+            .map(|batch| {
+                let output = tempfile::NamedTempFile::new().expect("creating 
temp file");
+
+                let props = WriterProperties::builder().build();
+                let file: std::fs::File = (*output.as_file())
+                    .try_clone()
+                    .expect("cloning file descriptor");
+                let mut writer = ArrowWriter::try_new(file, batch.schema(), 
Some(props))
+                    .expect("creating writer");
+
+                writer.write(&batch).expect("Writing batch");
+                writer.close().unwrap();
+                output
+            })
+            .collect();
+
+        let file_names: Vec<_> = files
+            .iter()
+            .map(|t| t.path().to_string_lossy().to_string())
+            .collect();
+
+        // Now, read the files back in
+        let file_groups: Vec<_> = file_names
+            .iter()
+            .map(|name| local_unpartitioned_file(name.clone()))
+            .collect();
+
+        // Infer the schema (if not provided)
+        let file_schema = match schema {
+            Some(provided_schema) => provided_schema,
+            None => ParquetFormat::default()
+                .infer_schema(local_object_reader_stream(file_names))
+                .await
+                .expect("inferring schema"),
+        };
+
+        // prepare the scan
+        let parquet_exec = ParquetExec::new(
+            FileScanConfig {
+                object_store: Arc::new(LocalFileSystem {}),
+                file_groups: vec![file_groups],
+                file_schema,
+                statistics: Statistics::default(),
+                projection,
+                limit: None,
+                table_partition_cols: vec![],
+            },
+            None,
+        );
+
+        let runtime = Arc::new(RuntimeEnv::default());
+        collect(Arc::new(parquet_exec), runtime)
+            .await
+            .expect("reading parquet data")
+    }
+
+    // Add a new column with the specified field name to the RecordBatch
+    fn add_to_batch(
+        batch: &RecordBatch,
+        field_name: &str,
+        array: ArrayRef,
+    ) -> RecordBatch {
+        let mut fields = batch.schema().fields().clone();
+        fields.push(Field::new(field_name, array.data_type().clone(), true));
+        let schema = Arc::new(Schema::new(fields));
+
+        let mut columns = batch.columns().to_vec();
+        columns.push(array);
+        RecordBatch::try_new(schema, columns).expect("error; creating record 
batch")
+    }
+
+    fn create_batch(columns: Vec<(&str, ArrayRef)>) -> RecordBatch {
+        columns.into_iter().fold(
+            RecordBatch::new_empty(Arc::new(Schema::new(vec![]))),
+            |batch, (field_name, arr)| add_to_batch(&batch, field_name, 
arr.clone()),
+        )
+    }
+
+    #[tokio::test]
+    async fn evolved_schema() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+        // batch1: c1(string)
+        let batch1 = add_to_batch(
+            &RecordBatch::new_empty(Arc::new(Schema::new(vec![]))),
+            "c1",
+            c1,
+        );
+
+        // batch2: c1(string) and c2(int64)
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), 
None]));
+        let batch2 = add_to_batch(&batch1, "c2", c2);
+
+        // batch3: c1(string) and c3(int8)
+        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), 
None]));
+        let batch3 = add_to_batch(&batch1, "c3", c3);
+
+        // read/write them files:
+        let read = round_trip_to_parquet(vec![batch1, batch2, batch3], None, 
None).await;
+        let expected = vec![
+            "+-----+----+----+",
+            "| c1  | c2 | c3 |",
+            "+-----+----+----+",
+            "|     |    |    |",
+            "|     |    | 20 |",
+            "|     | 2  |    |",
+            "| Foo |    |    |",
+            "| Foo |    | 10 |",
+            "| Foo | 1  |    |",
+            "| bar |    |    |",
+            "| bar |    |    |",
+            "| bar |    |    |",
+            "+-----+----+----+",
+        ];
+        assert_batches_sorted_eq!(expected, &read);
+    }
+
+    #[tokio::test]
+    async fn evolved_schema_inconsistent_order() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), 
None]));
+
+        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), 
None]));
+
+        // batch1: c1(string), c2(int64), c3(int8)
+        let batch1 = create_batch(vec![
+            ("c1", c1.clone()),
+            ("c2", c2.clone()),
+            ("c3", c3.clone()),
+        ]);
+
+        // batch2: c3(int8), c2(int64), c1(string)
+        let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1)]);
+
+        // read/write them files:
+        let read = round_trip_to_parquet(vec![batch1, batch2], None, 
None).await;
+        let expected = vec![
+            "+-----+----+----+",
+            "| c1  | c2 | c3 |",
+            "+-----+----+----+",
+            "| Foo | 1  | 10 |",
+            "|     | 2  | 20 |",
+            "| bar |    |    |",
+            "| Foo | 1  | 10 |",
+            "|     | 2  | 20 |",
+            "| bar |    |    |",
+            "+-----+----+----+",
+        ];
+        assert_batches_sorted_eq!(expected, &read);
+    }
+
+    #[tokio::test]
+    async fn evolved_schema_intersection() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), 
None]));
+
+        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), 
None]));
+
+        // batch1: c1(string), c2(int64), c3(int8)
+        let batch1 = create_batch(vec![("c1", c1), ("c3", c3.clone())]);
+
+        // batch2: c3(int8), c2(int64), c1(string)
+        let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]);
+
+        // read/write them files:
+        let read = round_trip_to_parquet(vec![batch1, batch2], None, 
None).await;
+        let expected = vec![
+            "+-----+----+----+",
+            "| c1  | c3 | c2 |",
+            "+-----+----+----+",
+            "| Foo | 10 |    |",
+            "|     | 20 |    |",
+            "| bar |    |    |",
+            "|     | 10 | 1  |",
+            "|     | 20 | 2  |",
+            "|     |    |    |",
+            "+-----+----+----+",
+        ];
+        assert_batches_sorted_eq!(expected, &read);
+    }
+
+    #[tokio::test]
+    async fn evolved_schema_projection() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), 
None]));
+
+        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), 
None]));
+
+        let c4: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("baz"), Some("boo"), None]));
+
+        // batch1: c1(string), c2(int64), c3(int8)
+        let batch1 = create_batch(vec![
+            ("c1", c1.clone()),
+            ("c2", c2.clone()),
+            ("c3", c3.clone()),
+        ]);
+
+        // batch2: c3(int8), c2(int64), c1(string), c4(string)
+        let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1), 
("c4", c4)]);
+
+        // read/write them files:
+        let read =
+            round_trip_to_parquet(vec![batch1, batch2], Some(vec![0, 3]), 
None).await;

Review comment:
       using the projection index `3` is a good 👍  (as it requires projecting 
on the merged schema)

##########
File path: datafusion/src/physical_plan/file_format/parquet.rs
##########
@@ -385,9 +389,33 @@ fn build_row_group_predicate(
     }
 }
 
+// Map projections from the schema which merges all file schemas to 
projections on a particular
+// file
+fn map_projections(
+    merged_schema: &Schema,
+    file_schema: &Schema,
+    projections: &[usize],
+) -> Result<Vec<usize>> {
+    let mut mapped: Vec<usize> = vec![];
+    for idx in projections {
+        let field = merged_schema.field(*idx);
+        if let Ok(mapped_idx) = file_schema.index_of(field.name().as_str()) {
+            if file_schema.field(mapped_idx).data_type() == field.data_type() {
+                mapped.push(mapped_idx)
+            } else {
+                let msg = format!("Failed to map column projection for field 
{}. Incompatible data types {:?} and {:?}", field.name(), 
file_schema.field(mapped_idx).data_type(), field.data_type());
+                error!("{}", msg);
+                return 
Err(DataFusionError::ParquetError(ParquetError::General(msg)));

Review comment:
       I recommend
   1. Using a `info` message (as depending on the usecase this may signal a 
user error rather than something to investigate)
   2. Using a `DataFusionError` rather than one from Parquet (the rationale 
being that this error comes from DataFusion, and is not related to being able 
to read the parquet files)
   
   ```suggestion
                   let msg = format!("Failed to map column projection for field 
{}. Incompatible data types {:?} and {:?}", field.name(), 
file_schema.field(mapped_idx).data_type(), field.data_type());
                   info!("{}", msg);
                   return Err(DataFusionError::Execution(msg));
   ```

##########
File path: datafusion/src/physical_plan/file_format/parquet.rs
##########
@@ -457,22 +518,313 @@ fn read_partition(
 
 #[cfg(test)]
 mod tests {
-    use crate::datasource::{
-        file_format::{parquet::ParquetFormat, FileFormat},
-        object_store::local::{
-            local_object_reader_stream, local_unpartitioned_file, 
LocalFileSystem,
+    use crate::{
+        assert_batches_sorted_eq,
+        datasource::{
+            file_format::{parquet::ParquetFormat, FileFormat},
+            object_store::local::{
+                local_object_reader_stream, local_unpartitioned_file, 
LocalFileSystem,
+            },
         },
+        physical_plan::collect,
     };
 
     use super::*;
-    use arrow::datatypes::{DataType, Field};
+    use arrow::{
+        array::{Int64Array, Int8Array, StringArray},
+        datatypes::{DataType, Field},
+    };
     use futures::StreamExt;
     use parquet::{
+        arrow::ArrowWriter,
         basic::Type as PhysicalType,
-        file::{metadata::RowGroupMetaData, statistics::Statistics as 
ParquetStatistics},
+        file::{
+            metadata::RowGroupMetaData, properties::WriterProperties,
+            statistics::Statistics as ParquetStatistics,
+        },
         schema::types::SchemaDescPtr,
     };
 
+    /// writes each RecordBatch as an individual parquet file and then
+    /// reads it back in to the named location.
+    async fn round_trip_to_parquet(
+        batches: Vec<RecordBatch>,
+        projection: Option<Vec<usize>>,
+        schema: Option<SchemaRef>,
+    ) -> Vec<RecordBatch> {
+        // When vec is dropped, temp files are deleted
+        let files: Vec<_> = batches
+            .into_iter()
+            .map(|batch| {
+                let output = tempfile::NamedTempFile::new().expect("creating 
temp file");
+
+                let props = WriterProperties::builder().build();
+                let file: std::fs::File = (*output.as_file())
+                    .try_clone()
+                    .expect("cloning file descriptor");
+                let mut writer = ArrowWriter::try_new(file, batch.schema(), 
Some(props))
+                    .expect("creating writer");
+
+                writer.write(&batch).expect("Writing batch");
+                writer.close().unwrap();
+                output
+            })
+            .collect();
+
+        let file_names: Vec<_> = files
+            .iter()
+            .map(|t| t.path().to_string_lossy().to_string())
+            .collect();
+
+        // Now, read the files back in
+        let file_groups: Vec<_> = file_names
+            .iter()
+            .map(|name| local_unpartitioned_file(name.clone()))
+            .collect();
+
+        // Infer the schema (if not provided)
+        let file_schema = match schema {
+            Some(provided_schema) => provided_schema,
+            None => ParquetFormat::default()
+                .infer_schema(local_object_reader_stream(file_names))
+                .await
+                .expect("inferring schema"),
+        };
+
+        // prepare the scan
+        let parquet_exec = ParquetExec::new(
+            FileScanConfig {
+                object_store: Arc::new(LocalFileSystem {}),
+                file_groups: vec![file_groups],
+                file_schema,
+                statistics: Statistics::default(),
+                projection,
+                limit: None,
+                table_partition_cols: vec![],
+            },
+            None,
+        );
+
+        let runtime = Arc::new(RuntimeEnv::default());
+        collect(Arc::new(parquet_exec), runtime)
+            .await
+            .expect("reading parquet data")
+    }
+
+    // Add a new column with the specified field name to the RecordBatch
+    fn add_to_batch(
+        batch: &RecordBatch,
+        field_name: &str,
+        array: ArrayRef,
+    ) -> RecordBatch {
+        let mut fields = batch.schema().fields().clone();
+        fields.push(Field::new(field_name, array.data_type().clone(), true));
+        let schema = Arc::new(Schema::new(fields));
+
+        let mut columns = batch.columns().to_vec();
+        columns.push(array);
+        RecordBatch::try_new(schema, columns).expect("error; creating record 
batch")
+    }
+
+    fn create_batch(columns: Vec<(&str, ArrayRef)>) -> RecordBatch {
+        columns.into_iter().fold(
+            RecordBatch::new_empty(Arc::new(Schema::new(vec![]))),
+            |batch, (field_name, arr)| add_to_batch(&batch, field_name, 
arr.clone()),
+        )
+    }
+
+    #[tokio::test]
+    async fn evolved_schema() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+        // batch1: c1(string)
+        let batch1 = add_to_batch(
+            &RecordBatch::new_empty(Arc::new(Schema::new(vec![]))),
+            "c1",
+            c1,
+        );
+
+        // batch2: c1(string) and c2(int64)
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), 
None]));
+        let batch2 = add_to_batch(&batch1, "c2", c2);
+
+        // batch3: c1(string) and c3(int8)
+        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), 
None]));
+        let batch3 = add_to_batch(&batch1, "c3", c3);
+
+        // read/write them files:
+        let read = round_trip_to_parquet(vec![batch1, batch2, batch3], None, 
None).await;
+        let expected = vec![
+            "+-----+----+----+",
+            "| c1  | c2 | c3 |",
+            "+-----+----+----+",
+            "|     |    |    |",
+            "|     |    | 20 |",
+            "|     | 2  |    |",
+            "| Foo |    |    |",
+            "| Foo |    | 10 |",
+            "| Foo | 1  |    |",
+            "| bar |    |    |",
+            "| bar |    |    |",
+            "| bar |    |    |",
+            "+-----+----+----+",
+        ];
+        assert_batches_sorted_eq!(expected, &read);
+    }
+
+    #[tokio::test]
+    async fn evolved_schema_inconsistent_order() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), 
None]));
+
+        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), 
None]));
+
+        // batch1: c1(string), c2(int64), c3(int8)
+        let batch1 = create_batch(vec![
+            ("c1", c1.clone()),
+            ("c2", c2.clone()),
+            ("c3", c3.clone()),
+        ]);
+
+        // batch2: c3(int8), c2(int64), c1(string)
+        let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1)]);
+
+        // read/write them files:
+        let read = round_trip_to_parquet(vec![batch1, batch2], None, 
None).await;
+        let expected = vec![
+            "+-----+----+----+",
+            "| c1  | c2 | c3 |",
+            "+-----+----+----+",
+            "| Foo | 1  | 10 |",
+            "|     | 2  | 20 |",
+            "| bar |    |    |",
+            "| Foo | 1  | 10 |",
+            "|     | 2  | 20 |",
+            "| bar |    |    |",
+            "+-----+----+----+",
+        ];
+        assert_batches_sorted_eq!(expected, &read);
+    }
+
+    #[tokio::test]
+    async fn evolved_schema_intersection() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), 
None]));
+
+        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), 
None]));
+
+        // batch1: c1(string), c2(int64), c3(int8)
+        let batch1 = create_batch(vec![("c1", c1), ("c3", c3.clone())]);
+
+        // batch2: c3(int8), c2(int64), c1(string)
+        let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]);
+
+        // read/write them files:
+        let read = round_trip_to_parquet(vec![batch1, batch2], None, 
None).await;
+        let expected = vec![
+            "+-----+----+----+",
+            "| c1  | c3 | c2 |",
+            "+-----+----+----+",
+            "| Foo | 10 |    |",
+            "|     | 20 |    |",
+            "| bar |    |    |",
+            "|     | 10 | 1  |",
+            "|     | 20 | 2  |",
+            "|     |    |    |",
+            "+-----+----+----+",
+        ];
+        assert_batches_sorted_eq!(expected, &read);
+    }
+
+    #[tokio::test]
+    async fn evolved_schema_projection() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), 
None]));
+
+        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), 
None]));
+
+        let c4: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("baz"), Some("boo"), None]));
+
+        // batch1: c1(string), c2(int64), c3(int8)
+        let batch1 = create_batch(vec![
+            ("c1", c1.clone()),
+            ("c2", c2.clone()),
+            ("c3", c3.clone()),
+        ]);
+
+        // batch2: c3(int8), c2(int64), c1(string), c4(string)
+        let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1), 
("c4", c4)]);
+
+        // read/write them files:
+        let read =
+            round_trip_to_parquet(vec![batch1, batch2], Some(vec![0, 3]), 
None).await;
+        let expected = vec![
+            "+-----+-----+",
+            "| c1  | c4  |",
+            "+-----+-----+",
+            "| Foo | baz |",
+            "|     | boo |",
+            "| bar |     |",
+            "| Foo |     |",
+            "|     |     |",
+            "| bar |     |",
+            "+-----+-----+",
+        ];
+        assert_batches_sorted_eq!(expected, &read);
+    }
+
+    #[tokio::test]
+    async fn evolved_schema_incompatible_types() {

Review comment:
       This test name implies it is for incompatible types, but then merges two 
files with compatible types.
   
   Perhaps you could switch one of the column types and then assert a panic like
    
   ```suggestion
      #[should_panic(expected = "incorrect types")]
       async fn evolved_schema_incompatible_types() {
   ```
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to