This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new deaa8ac  Handle merging of evolved schemas in ParquetExec (#1622)
deaa8ac is described below

commit deaa8acba785a0233a58ced311e5f092f4744bc1
Author: Dan Harris <[email protected]>
AuthorDate: Sun Jan 23 08:34:49 2022 -0500

    Handle merging of evolved schemas in ParquetExec (#1622)
    
    * Handle merging of evolved schemas in ParquetExec
    
    * Handle merging of evolved schemas in ParquetExec
    
    * Linting fix
    
    * Avoid unnecessary search by field name
    
    * Add round trip parquet testing
    
    * PR comments:
    
    1. Add additional test cases
    
    2. Map projected column indexes in all cases
    
    3. Raise an explicit error in the case where there a conflict between a
       file schema and the merged schema.
    
    * Remove redundant test case and revert submodule changes
    
    * Linting
    
    * Linting
    
    * Using a info message (as depending on the usecase this may signal a user 
error rather than something to investigate) Using a DataFusionError rather than 
one from Parquet (the rationale being that this error comes from DataFusion, 
and is not related to being able to read the parquet files)
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * PR comments: Clarify incompatible schema test
    
    * Linting
    
    * Clippy
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/src/datasource/file_format/parquet.rs   |  22 +-
 .../src/physical_plan/file_format/parquet.rs       | 373 ++++++++++++++++++++-
 2 files changed, 374 insertions(+), 21 deletions(-)

diff --git a/datafusion/src/datasource/file_format/parquet.rs 
b/datafusion/src/datasource/file_format/parquet.rs
index eedb4c9..4afb2f5 100644
--- a/datafusion/src/datasource/file_format/parquet.rs
+++ b/datafusion/src/datasource/file_format/parquet.rs
@@ -24,7 +24,7 @@ use std::sync::Arc;
 use arrow::datatypes::Schema;
 use arrow::datatypes::SchemaRef;
 use async_trait::async_trait;
-use futures::stream::StreamExt;
+use futures::TryStreamExt;
 use parquet::arrow::ArrowReader;
 use parquet::arrow::ParquetFileArrowReader;
 use parquet::errors::ParquetError;
@@ -87,16 +87,15 @@ impl FileFormat for ParquetFormat {
         self
     }
 
-    async fn infer_schema(&self, mut readers: ObjectReaderStream) -> 
Result<SchemaRef> {
-        // We currently get the schema information from the first file rather 
than do
-        // schema merging and this is a limitation.
-        // See https://issues.apache.org/jira/browse/ARROW-11017
-        let first_file = readers
-            .next()
-            .await
-            .ok_or_else(|| DataFusionError::Plan("No data file 
found".to_owned()))??;
-        let schema = fetch_schema(first_file)?;
-        Ok(Arc::new(schema))
+    async fn infer_schema(&self, readers: ObjectReaderStream) -> 
Result<SchemaRef> {
+        let merged_schema = readers
+            .try_fold(Schema::empty(), |acc, reader| async {
+                let next_schema = fetch_schema(reader);
+                Schema::try_merge([acc, next_schema?])
+                    .map_err(DataFusionError::ArrowError)
+            })
+            .await?;
+        Ok(Arc::new(merged_schema))
     }
 
     async fn infer_stats(&self, reader: Arc<dyn ObjectReader>) -> 
Result<Statistics> {
@@ -367,6 +366,7 @@ mod tests {
     };
 
     use super::*;
+
     use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
     use arrow::array::{
         BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
diff --git a/datafusion/src/physical_plan/file_format/parquet.rs 
b/datafusion/src/physical_plan/file_format/parquet.rs
index 5768d0a..d240fe2 100644
--- a/datafusion/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/src/physical_plan/file_format/parquet.rs
@@ -44,13 +44,14 @@ use arrow::{
     error::{ArrowError, Result as ArrowResult},
     record_batch::RecordBatch,
 };
-use log::debug;
+use log::{debug, info};
 use parquet::file::{
     metadata::RowGroupMetaData,
     reader::{FileReader, SerializedFileReader},
     statistics::Statistics as ParquetStatistics,
 };
 
+use arrow::array::new_null_array;
 use fmt::Debug;
 use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
 
@@ -214,9 +215,11 @@ impl ExecutionPlan for ParquetExec {
             &self.base_config.table_partition_cols,
         );
 
+        let file_schema_ref = self.base_config().file_schema.clone();
         let join_handle = task::spawn_blocking(move || {
             if let Err(e) = read_partition(
                 object_store.as_ref(),
+                file_schema_ref,
                 partition_index,
                 partition,
                 metrics,
@@ -414,9 +417,33 @@ fn build_row_group_predicate(
     }
 }
 
+// Map projections from the schema which merges all file schemas to 
projections on a particular
+// file
+fn map_projections(
+    merged_schema: &Schema,
+    file_schema: &Schema,
+    projections: &[usize],
+) -> Result<Vec<usize>> {
+    let mut mapped: Vec<usize> = vec![];
+    for idx in projections {
+        let field = merged_schema.field(*idx);
+        if let Ok(mapped_idx) = file_schema.index_of(field.name().as_str()) {
+            if file_schema.field(mapped_idx).data_type() == field.data_type() {
+                mapped.push(mapped_idx)
+            } else {
+                let msg = format!("Failed to map column projection for field 
{}. Incompatible data types {:?} and {:?}", field.name(), 
file_schema.field(mapped_idx).data_type(), field.data_type());
+                info!("{}", msg);
+                return Err(DataFusionError::Execution(msg));
+            }
+        }
+    }
+    Ok(mapped)
+}
+
 #[allow(clippy::too_many_arguments)]
 fn read_partition(
     object_store: &dyn ObjectStore,
+    file_schema: SchemaRef,
     partition_index: usize,
     partition: Vec<PartitionedFile>,
     metrics: ExecutionPlanMetricsSet,
@@ -429,6 +456,8 @@ fn read_partition(
 ) -> Result<()> {
     let mut total_rows = 0;
     'outer: for partitioned_file in partition {
+        debug!("Reading file {}", &partitioned_file.file_meta.path());
+
         let file_metrics = ParquetFileMetrics::new(
             partition_index,
             &*partitioned_file.file_meta.path(),
@@ -446,15 +475,46 @@ fn read_partition(
             );
             file_reader.filter_row_groups(&row_group_predicate);
         }
+
         let mut arrow_reader = 
ParquetFileArrowReader::new(Arc::new(file_reader));
-        let mut batch_reader = arrow_reader
-            .get_record_reader_by_columns(projection.to_owned(), batch_size)?;
+        let mapped_projections =
+            map_projections(&file_schema, &arrow_reader.get_schema()?, 
projection)?;
+
+        let mut batch_reader =
+            arrow_reader.get_record_reader_by_columns(mapped_projections, 
batch_size)?;
         loop {
             match batch_reader.next() {
                 Some(Ok(batch)) => {
+                    let total_cols = &file_schema.fields().len();
+                    let batch_rows = batch.num_rows();
                     total_rows += batch.num_rows();
+
+                    let batch_schema = batch.schema();
+
+                    let mut cols: Vec<ArrayRef> = 
Vec::with_capacity(*total_cols);
+                    let batch_cols = batch.columns().to_vec();
+
+                    for field_idx in projection {
+                        let merged_field = &file_schema.fields()[*field_idx];
+                        if let Some((batch_idx, _name)) =
+                            
batch_schema.column_with_name(merged_field.name().as_str())
+                        {
+                            cols.push(batch_cols[batch_idx].clone());
+                        } else {
+                            cols.push(new_null_array(
+                                merged_field.data_type(),
+                                batch_rows,
+                            ))
+                        }
+                    }
+
+                    let projected_schema = 
file_schema.clone().project(projection)?;
+
+                    let merged_batch =
+                        RecordBatch::try_new(Arc::new(projected_schema), 
cols)?;
+
                     let proj_batch = partition_column_projector
-                        .project(batch, &partitioned_file.partition_values);
+                        .project(merged_batch, 
&partitioned_file.partition_values);
 
                     send_result(&response_tx, proj_batch)?;
                     if limit.map(|l| total_rows >= l).unwrap_or(false) {
@@ -486,22 +546,315 @@ fn read_partition(
 
 #[cfg(test)]
 mod tests {
-    use crate::datasource::{
-        file_format::{parquet::ParquetFormat, FileFormat},
-        object_store::local::{
-            local_object_reader_stream, local_unpartitioned_file, 
LocalFileSystem,
+    use crate::{
+        assert_batches_sorted_eq,
+        datasource::{
+            file_format::{parquet::ParquetFormat, FileFormat},
+            object_store::local::{
+                local_object_reader_stream, local_unpartitioned_file, 
LocalFileSystem,
+            },
         },
+        physical_plan::collect,
     };
 
     use super::*;
-    use arrow::datatypes::{DataType, Field};
+    use arrow::array::Float32Array;
+    use arrow::{
+        array::{Int64Array, Int8Array, StringArray},
+        datatypes::{DataType, Field},
+    };
     use futures::StreamExt;
     use parquet::{
+        arrow::ArrowWriter,
         basic::Type as PhysicalType,
-        file::{metadata::RowGroupMetaData, statistics::Statistics as 
ParquetStatistics},
+        file::{
+            metadata::RowGroupMetaData, properties::WriterProperties,
+            statistics::Statistics as ParquetStatistics,
+        },
         schema::types::SchemaDescPtr,
     };
 
+    /// writes each RecordBatch as an individual parquet file and then
+    /// reads it back in to the named location.
+    async fn round_trip_to_parquet(
+        batches: Vec<RecordBatch>,
+        projection: Option<Vec<usize>>,
+        schema: Option<SchemaRef>,
+    ) -> Vec<RecordBatch> {
+        // When vec is dropped, temp files are deleted
+        let files: Vec<_> = batches
+            .into_iter()
+            .map(|batch| {
+                let output = tempfile::NamedTempFile::new().expect("creating 
temp file");
+
+                let props = WriterProperties::builder().build();
+                let file: std::fs::File = (*output.as_file())
+                    .try_clone()
+                    .expect("cloning file descriptor");
+                let mut writer = ArrowWriter::try_new(file, batch.schema(), 
Some(props))
+                    .expect("creating writer");
+
+                writer.write(&batch).expect("Writing batch");
+                writer.close().unwrap();
+                output
+            })
+            .collect();
+
+        let file_names: Vec<_> = files
+            .iter()
+            .map(|t| t.path().to_string_lossy().to_string())
+            .collect();
+
+        // Now, read the files back in
+        let file_groups: Vec<_> = file_names
+            .iter()
+            .map(|name| local_unpartitioned_file(name.clone()))
+            .collect();
+
+        // Infer the schema (if not provided)
+        let file_schema = match schema {
+            Some(provided_schema) => provided_schema,
+            None => ParquetFormat::default()
+                .infer_schema(local_object_reader_stream(file_names))
+                .await
+                .expect("inferring schema"),
+        };
+
+        // prepare the scan
+        let parquet_exec = ParquetExec::new(
+            FileScanConfig {
+                object_store: Arc::new(LocalFileSystem {}),
+                file_groups: vec![file_groups],
+                file_schema,
+                statistics: Statistics::default(),
+                projection,
+                limit: None,
+                table_partition_cols: vec![],
+            },
+            None,
+        );
+
+        let runtime = Arc::new(RuntimeEnv::default());
+        collect(Arc::new(parquet_exec), runtime)
+            .await
+            .expect("reading parquet data")
+    }
+
+    // Add a new column with the specified field name to the RecordBatch
+    fn add_to_batch(
+        batch: &RecordBatch,
+        field_name: &str,
+        array: ArrayRef,
+    ) -> RecordBatch {
+        let mut fields = batch.schema().fields().clone();
+        fields.push(Field::new(field_name, array.data_type().clone(), true));
+        let schema = Arc::new(Schema::new(fields));
+
+        let mut columns = batch.columns().to_vec();
+        columns.push(array);
+        RecordBatch::try_new(schema, columns).expect("error; creating record 
batch")
+    }
+
+    fn create_batch(columns: Vec<(&str, ArrayRef)>) -> RecordBatch {
+        columns.into_iter().fold(
+            RecordBatch::new_empty(Arc::new(Schema::new(vec![]))),
+            |batch, (field_name, arr)| add_to_batch(&batch, field_name, 
arr.clone()),
+        )
+    }
+
+    #[tokio::test]
+    async fn evolved_schema() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+        // batch1: c1(string)
+        let batch1 = add_to_batch(
+            &RecordBatch::new_empty(Arc::new(Schema::new(vec![]))),
+            "c1",
+            c1,
+        );
+
+        // batch2: c1(string) and c2(int64)
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), 
None]));
+        let batch2 = add_to_batch(&batch1, "c2", c2);
+
+        // batch3: c1(string) and c3(int8)
+        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), 
None]));
+        let batch3 = add_to_batch(&batch1, "c3", c3);
+
+        // read/write them files:
+        let read = round_trip_to_parquet(vec![batch1, batch2, batch3], None, 
None).await;
+        let expected = vec![
+            "+-----+----+----+",
+            "| c1  | c2 | c3 |",
+            "+-----+----+----+",
+            "|     |    |    |",
+            "|     |    | 20 |",
+            "|     | 2  |    |",
+            "| Foo |    |    |",
+            "| Foo |    | 10 |",
+            "| Foo | 1  |    |",
+            "| bar |    |    |",
+            "| bar |    |    |",
+            "| bar |    |    |",
+            "+-----+----+----+",
+        ];
+        assert_batches_sorted_eq!(expected, &read);
+    }
+
+    #[tokio::test]
+    async fn evolved_schema_inconsistent_order() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), 
None]));
+
+        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), 
None]));
+
+        // batch1: c1(string), c2(int64), c3(int8)
+        let batch1 = create_batch(vec![
+            ("c1", c1.clone()),
+            ("c2", c2.clone()),
+            ("c3", c3.clone()),
+        ]);
+
+        // batch2: c3(int8), c2(int64), c1(string)
+        let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1)]);
+
+        // read/write them files:
+        let read = round_trip_to_parquet(vec![batch1, batch2], None, 
None).await;
+        let expected = vec![
+            "+-----+----+----+",
+            "| c1  | c2 | c3 |",
+            "+-----+----+----+",
+            "| Foo | 1  | 10 |",
+            "|     | 2  | 20 |",
+            "| bar |    |    |",
+            "| Foo | 1  | 10 |",
+            "|     | 2  | 20 |",
+            "| bar |    |    |",
+            "+-----+----+----+",
+        ];
+        assert_batches_sorted_eq!(expected, &read);
+    }
+
+    #[tokio::test]
+    async fn evolved_schema_intersection() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), 
None]));
+
+        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), 
None]));
+
+        // batch1: c1(string), c2(int64), c3(int8)
+        let batch1 = create_batch(vec![("c1", c1), ("c3", c3.clone())]);
+
+        // batch2: c3(int8), c2(int64), c1(string)
+        let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]);
+
+        // read/write them files:
+        let read = round_trip_to_parquet(vec![batch1, batch2], None, 
None).await;
+        let expected = vec![
+            "+-----+----+----+",
+            "| c1  | c3 | c2 |",
+            "+-----+----+----+",
+            "| Foo | 10 |    |",
+            "|     | 20 |    |",
+            "| bar |    |    |",
+            "|     | 10 | 1  |",
+            "|     | 20 | 2  |",
+            "|     |    |    |",
+            "+-----+----+----+",
+        ];
+        assert_batches_sorted_eq!(expected, &read);
+    }
+
+    #[tokio::test]
+    async fn evolved_schema_projection() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), 
None]));
+
+        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), 
None]));
+
+        let c4: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("baz"), Some("boo"), None]));
+
+        // batch1: c1(string), c2(int64), c3(int8)
+        let batch1 = create_batch(vec![
+            ("c1", c1.clone()),
+            ("c2", c2.clone()),
+            ("c3", c3.clone()),
+        ]);
+
+        // batch2: c3(int8), c2(int64), c1(string), c4(string)
+        let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1), 
("c4", c4)]);
+
+        // read/write them files:
+        let read =
+            round_trip_to_parquet(vec![batch1, batch2], Some(vec![0, 3]), 
None).await;
+        let expected = vec![
+            "+-----+-----+",
+            "| c1  | c4  |",
+            "+-----+-----+",
+            "| Foo | baz |",
+            "|     | boo |",
+            "| bar |     |",
+            "| Foo |     |",
+            "|     |     |",
+            "| bar |     |",
+            "+-----+-----+",
+        ];
+        assert_batches_sorted_eq!(expected, &read);
+    }
+
+    #[tokio::test]
+    async fn evolved_schema_incompatible_types() {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), 
None]));
+
+        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), 
None]));
+
+        let c4: ArrayRef =
+            Arc::new(Float32Array::from(vec![Some(1.0_f32), Some(2.0_f32), 
None]));
+
+        // batch1: c1(string), c2(int64), c3(int8)
+        let batch1 = create_batch(vec![
+            ("c1", c1.clone()),
+            ("c2", c2.clone()),
+            ("c3", c3.clone()),
+        ]);
+
+        // batch2: c3(int8), c2(int64), c1(string), c4(string)
+        let batch2 = create_batch(vec![("c3", c4), ("c2", c2), ("c1", c1)]);
+
+        let schema = Schema::new(vec![
+            Field::new("c1", DataType::Utf8, true),
+            Field::new("c2", DataType::Int64, true),
+            Field::new("c3", DataType::Int8, true),
+        ]);
+
+        // read/write them files:
+        let read =
+            round_trip_to_parquet(vec![batch1, batch2], None, 
Some(Arc::new(schema)))
+                .await;
+
+        // expect only the first batch to be read
+        let expected = vec![
+            "+-----+----+----+",
+            "| c1  | c2 | c3 |",
+            "+-----+----+----+",
+            "| Foo | 1  | 10 |",
+            "|     | 2  | 20 |",
+            "| bar |    |    |",
+            "+-----+----+----+",
+        ];
+        assert_batches_sorted_eq!(expected, &read);
+    }
+
     #[tokio::test]
     async fn parquet_exec_with_projection() -> Result<()> {
         let runtime = Arc::new(RuntimeEnv::default());

Reply via email to