thinkharderdev commented on a change in pull request #1622:
URL: https://github.com/apache/arrow-datafusion/pull/1622#discussion_r790151401



##########
File path: datafusion/src/physical_plan/file_format/parquet.rs
##########
@@ -457,22 +518,313 @@ fn read_partition(
 
 #[cfg(test)]
 mod tests {
-    use crate::datasource::{
-        file_format::{parquet::ParquetFormat, FileFormat},
-        object_store::local::{
-            local_object_reader_stream, local_unpartitioned_file, 
LocalFileSystem,
+    use crate::{
+        assert_batches_sorted_eq,
+        datasource::{
+            file_format::{parquet::ParquetFormat, FileFormat},
+            object_store::local::{
+                local_object_reader_stream, local_unpartitioned_file, 
LocalFileSystem,
+            },
         },
+        physical_plan::collect,
     };
 
     use super::*;
-    use arrow::datatypes::{DataType, Field};
+    use arrow::{
+        array::{Int64Array, Int8Array, StringArray},
+        datatypes::{DataType, Field},
+    };
     use futures::StreamExt;
     use parquet::{
+        arrow::ArrowWriter,
         basic::Type as PhysicalType,
-        file::{metadata::RowGroupMetaData, statistics::Statistics as 
ParquetStatistics},
+        file::{
+            metadata::RowGroupMetaData, properties::WriterProperties,
+            statistics::Statistics as ParquetStatistics,
+        },
         schema::types::SchemaDescPtr,
     };
 
+    /// writes each RecordBatch as an individual parquet file and then
+    /// reads it back in to the named location.
+    async fn round_trip_to_parquet(
+        batches: Vec<RecordBatch>,
+        projection: Option<Vec<usize>>,
+        schema: Option<SchemaRef>,
+    ) -> Vec<RecordBatch> {
+        // When vec is dropped, temp files are deleted
+        let files: Vec<_> = batches
+            .into_iter()
+            .map(|batch| {
+                let output = tempfile::NamedTempFile::new().expect("creating 
temp file");
+
+                let props = WriterProperties::builder().build();
+                let file: std::fs::File = (*output.as_file())
+                    .try_clone()
+                    .expect("cloning file descriptor");
+                let mut writer = ArrowWriter::try_new(file, batch.schema(), 
Some(props))
+                    .expect("creating writer");
+
+                writer.write(&batch).expect("Writing batch");
+                writer.close().unwrap();
+                output
+            })
+            .collect();
+
+        let file_names: Vec<_> = files
+            .iter()
+            .map(|t| t.path().to_string_lossy().to_string())
+            .collect();
+
+        // Now, read the files back in
+        let file_groups: Vec<_> = file_names
+            .iter()
+            .map(|name| local_unpartitioned_file(name.clone()))
+            .collect();
+
+        // Infer the schema (if not provided)
+        let file_schema = match schema {
+            Some(provided_schema) => provided_schema,
+            None => ParquetFormat::default()
+                .infer_schema(local_object_reader_stream(file_names))
+                .await
+                .expect("inferring schema"),
+        };
+
+        // prepare the scan
+        let parquet_exec = ParquetExec::new(
+            FileScanConfig {
+                object_store: Arc::new(LocalFileSystem {}),
+                file_groups: vec![file_groups],
+                file_schema,
+                statistics: Statistics::default(),
+                projection,
+                limit: None,
+                table_partition_cols: vec![],
+            },
+            None,
+        );
+
+        let runtime = Arc::new(RuntimeEnv::default());
+        collect(Arc::new(parquet_exec), runtime)
+            .await
+            .expect("reading parquet data")
+    }
+
+    // Add a new column with the specified field name to the RecordBatch
+    fn add_to_batch(
+        batch: &RecordBatch,
+        field_name: &str,
+        array: ArrayRef,
+    ) -> RecordBatch {
+        let mut fields = batch.schema().fields().clone();
+        fields.push(Field::new(field_name, array.data_type().clone(), true));
+        let schema = Arc::new(Schema::new(fields));
+
+        let mut columns = batch.columns().to_vec();
+        columns.push(array);
+        RecordBatch::try_new(schema, columns).expect("error; creating record 
batch")
+    }
+
+    fn create_batch(columns: Vec<(&str, ArrayRef)>) -> RecordBatch {
+        columns.into_iter().fold(
+            RecordBatch::new_empty(Arc::new(Schema::new(vec![]))),
+            |batch, (field_name, arr)| add_to_batch(&batch, field_name, 
arr.clone()),
+        )
+    }
+
+    #[tokio::test]
+    async fn evolved_schema() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+        // batch1: c1(string)
+        let batch1 = add_to_batch(
+            &RecordBatch::new_empty(Arc::new(Schema::new(vec![]))),
+            "c1",
+            c1,
+        );
+
+        // batch2: c1(string) and c2(int64)
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), 
None]));
+        let batch2 = add_to_batch(&batch1, "c2", c2);
+
+        // batch3: c1(string) and c3(int8)
+        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), 
None]));
+        let batch3 = add_to_batch(&batch1, "c3", c3);
+
+        // read/write them files:
+        let read = round_trip_to_parquet(vec![batch1, batch2, batch3], None, 
None).await;
+        let expected = vec![
+            "+-----+----+----+",
+            "| c1  | c2 | c3 |",
+            "+-----+----+----+",
+            "|     |    |    |",
+            "|     |    | 20 |",
+            "|     | 2  |    |",
+            "| Foo |    |    |",
+            "| Foo |    | 10 |",
+            "| Foo | 1  |    |",
+            "| bar |    |    |",
+            "| bar |    |    |",
+            "| bar |    |    |",
+            "+-----+----+----+",
+        ];
+        assert_batches_sorted_eq!(expected, &read);
+    }
+
+    #[tokio::test]
+    async fn evolved_schema_inconsistent_order() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), 
None]));
+
+        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), 
None]));
+
+        // batch1: c1(string), c2(int64), c3(int8)
+        let batch1 = create_batch(vec![
+            ("c1", c1.clone()),
+            ("c2", c2.clone()),
+            ("c3", c3.clone()),
+        ]);
+
+        // batch2: c3(int8), c2(int64), c1(string)
+        let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1)]);
+
+        // read/write them files:
+        let read = round_trip_to_parquet(vec![batch1, batch2], None, 
None).await;
+        let expected = vec![
+            "+-----+----+----+",
+            "| c1  | c2 | c3 |",
+            "+-----+----+----+",
+            "| Foo | 1  | 10 |",
+            "|     | 2  | 20 |",
+            "| bar |    |    |",
+            "| Foo | 1  | 10 |",
+            "|     | 2  | 20 |",
+            "| bar |    |    |",
+            "+-----+----+----+",
+        ];
+        assert_batches_sorted_eq!(expected, &read);
+    }
+
+    #[tokio::test]
+    async fn evolved_schema_intersection() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), 
None]));
+
+        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), 
None]));
+
+        // batch1: c1(string), c2(int64), c3(int8)
+        let batch1 = create_batch(vec![("c1", c1), ("c3", c3.clone())]);
+
+        // batch2: c3(int8), c2(int64), c1(string)
+        let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]);
+
+        // read/write them files:
+        let read = round_trip_to_parquet(vec![batch1, batch2], None, 
None).await;
+        let expected = vec![
+            "+-----+----+----+",
+            "| c1  | c3 | c2 |",
+            "+-----+----+----+",
+            "| Foo | 10 |    |",
+            "|     | 20 |    |",
+            "| bar |    |    |",
+            "|     | 10 | 1  |",
+            "|     | 20 | 2  |",
+            "|     |    |    |",
+            "+-----+----+----+",
+        ];
+        assert_batches_sorted_eq!(expected, &read);
+    }
+
+    #[tokio::test]
+    async fn evolved_schema_projection() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), 
None]));
+
+        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), 
None]));
+
+        let c4: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("baz"), Some("boo"), None]));
+
+        // batch1: c1(string), c2(int64), c3(int8)
+        let batch1 = create_batch(vec![
+            ("c1", c1.clone()),
+            ("c2", c2.clone()),
+            ("c3", c3.clone()),
+        ]);
+
+        // batch2: c3(int8), c2(int64), c1(string), c4(string)
+        let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1), 
("c4", c4)]);
+
+        // read/write them files:
+        let read =
+            round_trip_to_parquet(vec![batch1, batch2], Some(vec![0, 3]), 
None).await;
+        let expected = vec![
+            "+-----+-----+",
+            "| c1  | c4  |",
+            "+-----+-----+",
+            "| Foo | baz |",
+            "|     | boo |",
+            "| bar |     |",
+            "| Foo |     |",
+            "|     |     |",
+            "| bar |     |",
+            "+-----+-----+",
+        ];
+        assert_batches_sorted_eq!(expected, &read);
+    }
+
+    #[tokio::test]
+    async fn evolved_schema_incompatible_types() {

Review comment:
       That is what I tried to do originally but the issue is that the panic is 
on the reader thread. In effect it basically just causes the read on that 
partition to get dropped. We could test for a panic by calling `read_partition` 
directly from the test if you'd rather do it that way. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to