This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bec762  Create SchemaAdapter trait to map table schema to file 
schemas (#1709)
7bec762 is described below

commit 7bec762d1f1ebef4801af2eefd7a5033c474fe77
Author: Dan Harris <[email protected]>
AuthorDate: Mon Jan 31 15:41:21 2022 -0500

    Create SchemaAdapter trait to map table schema to file schemas (#1709)
    
    * Create SchemaAdapter trait to map table schema to file schemas
    
    * Linting fix
    
    * Remove commented code
---
 datafusion/src/physical_plan/file_format/avro.rs   |  64 +++++++++-
 datafusion/src/physical_plan/file_format/csv.rs    |  47 +++++++
 datafusion/src/physical_plan/file_format/json.rs   |  43 +++++++
 datafusion/src/physical_plan/file_format/mod.rs    | 140 +++++++++++++++++++++
 .../src/physical_plan/file_format/parquet.rs       |  68 ++--------
 datafusion/src/test_util.rs                        |  26 ++++
 6 files changed, 330 insertions(+), 58 deletions(-)

diff --git a/datafusion/src/physical_plan/file_format/avro.rs 
b/datafusion/src/physical_plan/file_format/avro.rs
index 9de8ffd..b389686 100644
--- a/datafusion/src/physical_plan/file_format/avro.rs
+++ b/datafusion/src/physical_plan/file_format/avro.rs
@@ -165,13 +165,14 @@ impl ExecutionPlan for AvroExec {
 #[cfg(test)]
 #[cfg(feature = "avro")]
 mod tests {
-
     use crate::datasource::file_format::{avro::AvroFormat, FileFormat};
     use crate::datasource::object_store::local::{
         local_object_reader_stream, local_unpartitioned_file, LocalFileSystem,
     };
     use crate::scalar::ScalarValue;
+    use arrow::datatypes::{DataType, Field, Schema};
     use futures::StreamExt;
+    use sqlparser::ast::ObjectType::Schema;
 
     use super::*;
 
@@ -229,6 +230,67 @@ mod tests {
     }
 
     #[tokio::test]
+    async fn avro_exec_missing_column() -> Result<()> {
+        let testdata = crate::test_util::arrow_test_data();
+        let filename = format!("{}/avro/alltypes_plain.avro", testdata);
+        let actual_schema = AvroFormat {}
+            .infer_schema(local_object_reader_stream(vec![filename]))
+            .await?;
+
+        let mut fields = actual_schema.fields().clone();
+        fields.push(Field::new("missing_col", DataType::Int32, true));
+
+        let file_schema = Arc::new(Schema::new(fields));
+
+        let avro_exec = AvroExec::new(FileScanConfig {
+            object_store: Arc::new(LocalFileSystem {}),
+            file_groups: 
vec![vec![local_unpartitioned_file(filename.clone())]],
+            file_schema,
+            statistics: Statistics::default(),
+            // Include the missing column in the projection
+            projection: Some(vec![0, 1, 2, file_schema.fields().len()]),
+            limit: None,
+            table_partition_cols: vec![],
+        });
+        assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
+
+        let mut results = avro_exec.execute(0).await.expect("plan execution 
failed");
+        let batch = results
+            .next()
+            .await
+            .expect("plan iterator empty")
+            .expect("plan iterator returned an error");
+
+        let expected = vec![
+            "+----+----------+-------------+-------------+",
+            "| id | bool_col | tinyint_col | missing_col |",
+            "+----+----------+-------------+-------------+",
+            "| 4  | true     | 0           |             |",
+            "| 5  | false    | 1           |             |",
+            "| 6  | true     | 0           |             |",
+            "| 7  | false    | 1           |             |",
+            "| 2  | true     | 0           |             |",
+            "| 3  | false    | 1           |             |",
+            "| 0  | true     | 0           |             |",
+            "| 1  | false    | 1           |             |",
+            "+----+----------+-------------+-------------+",
+        ];
+
+        crate::assert_batches_eq!(expected, &[batch]);
+
+        let batch = results.next().await;
+        assert!(batch.is_none());
+
+        let batch = results.next().await;
+        assert!(batch.is_none());
+
+        let batch = results.next().await;
+        assert!(batch.is_none());
+
+        Ok(())
+    }
+
+    #[tokio::test]
     async fn avro_exec_with_partition() -> Result<()> {
         let testdata = crate::test_util::arrow_test_data();
         let filename = format!("{}/avro/alltypes_plain.avro", testdata);
diff --git a/datafusion/src/physical_plan/file_format/csv.rs 
b/datafusion/src/physical_plan/file_format/csv.rs
index 5cff3b6..4cf70f6 100644
--- a/datafusion/src/physical_plan/file_format/csv.rs
+++ b/datafusion/src/physical_plan/file_format/csv.rs
@@ -170,6 +170,7 @@ impl ExecutionPlan for CsvExec {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::test_util::aggr_test_schema_with_missing_col;
     use crate::{
         datasource::object_store::local::{local_unpartitioned_file, 
LocalFileSystem},
         scalar::ScalarValue,
@@ -270,6 +271,52 @@ mod tests {
     }
 
     #[tokio::test]
+    async fn csv_exec_with_missing_column() -> Result<()> {
+        let runtime = Arc::new(RuntimeEnv::default());
+        let file_schema = aggr_test_schema_with_missing_col();
+        let testdata = crate::test_util::arrow_test_data();
+        let filename = "aggregate_test_100.csv";
+        let path = format!("{}/csv/{}", testdata, filename);
+        let csv = CsvExec::new(
+            FileScanConfig {
+                object_store: Arc::new(LocalFileSystem {}),
+                file_schema,
+                file_groups: vec![vec![local_unpartitioned_file(path)]],
+                statistics: Statistics::default(),
+                projection: None,
+                limit: Some(5),
+                table_partition_cols: vec![],
+            },
+            true,
+            b',',
+        );
+        assert_eq!(14, csv.base_config.file_schema.fields().len());
+        assert_eq!(14, csv.projected_schema.fields().len());
+        assert_eq!(14, csv.schema().fields().len());
+
+        let mut it = csv.execute(0, runtime).await?;
+        let batch = it.next().await.unwrap()?;
+        assert_eq!(14, batch.num_columns());
+        assert_eq!(5, batch.num_rows());
+
+        let expected = vec![
+            
"+----+----+-----+--------+------------+----------------------+-----+-------+------------+----------------------+-------------+---------------------+--------------------------------+-------------+",
+            "| c1 | c2 | c3  | c4     | c5         | c6                   | c7 
 | c8    | c9         | c10                  | c11         | c12                
 | c13                            | missing_col |",
+            
"+----+----+-----+--------+------------+----------------------+-----+-------+------------+----------------------+-------------+---------------------+--------------------------------+-------------+",
+            "| c  | 2  | 1   | 18109  | 2033001162 | -6513304855495910254 | 25 
 | 43062 | 1491205016 | 5863949479783605708  | 0.110830784 | 0.9294097332465232 
 | 6WfVFBVGJSQb7FhA7E0lBwdvjfZnSW |             |",
+            "| d  | 5  | -40 | 22614  | 706441268  | -7542719935673075327 | 
155 | 14337 | 3373581039 | 11720144131976083864 | 0.69632107  | 
0.3114712539863804  | C2GT5KVyOPZpgKVl110TyZO0NcJ434 |             |",
+            "| b  | 1  | 29  | -18218 | 994303988  | 5983957848665088916  | 
204 | 9489  | 3275293996 | 14857091259186476033 | 0.53840446  | 
0.17909035118828576 | AyYVExXK6AR2qUTxNZ7qRHQOVGMLcz |             |",
+            "| a  | 1  | -85 | -15154 | 1171968280 | 1919439543497968449  | 77 
 | 52286 | 774637006  | 12101411955859039553 | 0.12285209  | 0.6864391962767343 
 | 0keZ5G8BffGwgF2RwQD59TFzMStxCB |             |",
+            "| b  | 5  | -82 | 22080  | 1824882165 | 7373730676428214987  | 
208 | 34331 | 3342719438 | 3330177516592499461  | 0.82634634  | 
0.40975383525297016 | Ig1QcuKsjHXkproePdERo2w0mYzIqd |             |",
+            
"+----+----+-----+--------+------------+----------------------+-----+-------+------------+----------------------+-------------+---------------------+--------------------------------+-------------+",
+        ];
+
+        crate::assert_batches_eq!(expected, &[batch]);
+
+        Ok(())
+    }
+
+    #[tokio::test]
     async fn csv_exec_with_partition() -> Result<()> {
         let runtime = Arc::new(RuntimeEnv::default());
         let file_schema = aggr_test_schema();
diff --git a/datafusion/src/physical_plan/file_format/json.rs 
b/datafusion/src/physical_plan/file_format/json.rs
index 0fc95d1..ac41306 100644
--- a/datafusion/src/physical_plan/file_format/json.rs
+++ b/datafusion/src/physical_plan/file_format/json.rs
@@ -137,6 +137,8 @@ impl ExecutionPlan for NdJsonExec {
 
 #[cfg(test)]
 mod tests {
+    use arrow::array::Array;
+    use arrow::datatypes::{Field, Schema};
     use futures::StreamExt;
 
     use crate::datasource::{
@@ -212,6 +214,47 @@ mod tests {
     }
 
     #[tokio::test]
+    async fn nd_json_exec_file_with_missing_column() -> Result<()> {
+        let runtime = Arc::new(RuntimeEnv::default());
+        use arrow::datatypes::DataType;
+        let path = format!("{}/1.json", TEST_DATA_BASE);
+
+        let actual_schema = infer_schema(path.clone()).await?;
+
+        let mut fields = actual_schema.fields().clone();
+        fields.push(Field::new("missing_col", DataType::Int32, true));
+        let missing_field_idx = fields.len() - 1;
+
+        let file_schema = Arc::new(Schema::new(fields));
+
+        let exec = NdJsonExec::new(FileScanConfig {
+            object_store: Arc::new(LocalFileSystem {}),
+            file_groups: vec![vec![local_unpartitioned_file(path.clone())]],
+            file_schema,
+            statistics: Statistics::default(),
+            projection: None,
+            limit: Some(3),
+            table_partition_cols: vec![],
+        });
+
+        let mut it = exec.execute(0, runtime).await?;
+        let batch = it.next().await.unwrap()?;
+
+        assert_eq!(batch.num_rows(), 3);
+        let values = batch
+            .column(missing_field_idx)
+            .as_any()
+            .downcast_ref::<arrow::array::Int32Array>()
+            .unwrap();
+        assert_eq!(values.len(), 3);
+        assert!(values.is_null(0));
+        assert!(values.is_null(1));
+        assert!(values.is_null(2));
+
+        Ok(())
+    }
+
+    #[tokio::test]
     async fn nd_json_exec_file_projection() -> Result<()> {
         let runtime = Arc::new(RuntimeEnv::default());
         let path = format!("{}/1.json", TEST_DATA_BASE);
diff --git a/datafusion/src/physical_plan/file_format/mod.rs 
b/datafusion/src/physical_plan/file_format/mod.rs
index b655cdb..7658add 100644
--- a/datafusion/src/physical_plan/file_format/mod.rs
+++ b/datafusion/src/physical_plan/file_format/mod.rs
@@ -35,11 +35,15 @@ pub use avro::AvroExec;
 pub use csv::CsvExec;
 pub use json::NdJsonExec;
 
+use crate::error::DataFusionError;
 use crate::{
     datasource::{object_store::ObjectStore, PartitionedFile},
+    error::Result,
     scalar::ScalarValue,
 };
+use arrow::array::new_null_array;
 use lazy_static::lazy_static;
+use log::info;
 use std::{
     collections::HashMap,
     fmt::{Display, Formatter, Result as FmtResult},
@@ -165,6 +169,87 @@ impl<'a> Display for FileGroupsDisplay<'a> {
     }
 }
 
+/// A utility which can adapt file-level record batches to a table schema 
which may have a schema
+/// obtained from merging multiple file-level schemas.
+///
+/// This is useful for enabling schema evolution in partitioned datasets.
+///
+/// This has to be done in two stages.
+///
+/// 1. Before reading the file, we have to map projected column indexes from 
the table schema to
+///    the file schema.
+///
+/// 2. After reading a record batch we need to map the read columns back to 
the expected columns
+///    indexes and insert null-valued columns wherever the file schema was 
missing a colum present
+///    in the table schema.
+#[derive(Clone, Debug)]
+pub(crate) struct SchemaAdapter {
+    /// Schema for the table
+    table_schema: SchemaRef,
+}
+
+impl SchemaAdapter {
+    pub(crate) fn new(table_schema: SchemaRef) -> SchemaAdapter {
+        Self { table_schema }
+    }
+
+    /// Map projected column indexes to the file schema. This will fail if the 
table schema
+    /// and the file schema contain a field with the same name and different 
types.
+    pub fn map_projections(
+        &self,
+        file_schema: &Schema,
+        projections: &[usize],
+    ) -> Result<Vec<usize>> {
+        let mut mapped: Vec<usize> = vec![];
+        for idx in projections {
+            let field = self.table_schema.field(*idx);
+            if let Ok(mapped_idx) = 
file_schema.index_of(field.name().as_str()) {
+                if file_schema.field(mapped_idx).data_type() == 
field.data_type() {
+                    mapped.push(mapped_idx)
+                } else {
+                    let msg = format!("Failed to map column projection for 
field {}. Incompatible data types {:?} and {:?}", field.name(), 
file_schema.field(mapped_idx).data_type(), field.data_type());
+                    info!("{}", msg);
+                    return Err(DataFusionError::Execution(msg));
+                }
+            }
+        }
+        Ok(mapped)
+    }
+
+    /// Re-order projected columns by index in record batch to match table 
schema column ordering. If the record
+    /// batch does not contain a column for an expected field, insert a 
null-valued column at the
+    /// required column index.
+    pub fn adapt_batch(
+        &self,
+        batch: RecordBatch,
+        projections: &[usize],
+    ) -> Result<RecordBatch> {
+        let batch_rows = batch.num_rows();
+
+        let batch_schema = batch.schema();
+
+        let mut cols: Vec<ArrayRef> = 
Vec::with_capacity(batch.columns().len());
+        let batch_cols = batch.columns().to_vec();
+
+        for field_idx in projections {
+            let table_field = &self.table_schema.fields()[*field_idx];
+            if let Some((batch_idx, _name)) =
+                batch_schema.column_with_name(table_field.name().as_str())
+            {
+                cols.push(batch_cols[batch_idx].clone());
+            } else {
+                cols.push(new_null_array(table_field.data_type(), batch_rows))
+            }
+        }
+
+        let projected_schema = 
Arc::new(self.table_schema.clone().project(projections)?);
+
+        let merged_batch = RecordBatch::try_new(projected_schema, cols)?;
+
+        Ok(merged_batch)
+    }
+}
+
 /// A helper that projects partition columns into the file record batches.
 ///
 /// One interesting trick is the usage of a cache for the key buffers of the 
partition column
@@ -467,6 +552,61 @@ mod tests {
         crate::assert_batches_eq!(expected, &[projected_batch]);
     }
 
+    #[test]
+    fn schema_adapter_adapt_projections() {
+        let table_schema = Arc::new(Schema::new(vec![
+            Field::new("c1", DataType::Utf8, true),
+            Field::new("c2", DataType::Int64, true),
+            Field::new("c3", DataType::Int8, true),
+        ]));
+
+        let file_schema = Schema::new(vec![
+            Field::new("c1", DataType::Utf8, true),
+            Field::new("c2", DataType::Int64, true),
+        ]);
+
+        let file_schema_2 = Arc::new(Schema::new(vec![
+            Field::new("c3", DataType::Int8, true),
+            Field::new("c2", DataType::Int64, true),
+        ]));
+
+        let file_schema_3 =
+            Arc::new(Schema::new(vec![Field::new("c3", DataType::Float32, 
true)]));
+
+        let adapter = SchemaAdapter::new(table_schema);
+
+        let projections1: Vec<usize> = vec![0, 1, 2];
+        let projections2: Vec<usize> = vec![2];
+
+        let mapped = adapter
+            .map_projections(&file_schema, projections1.as_slice())
+            .expect("mapping projections");
+
+        assert_eq!(mapped, vec![0, 1]);
+
+        let mapped = adapter
+            .map_projections(&file_schema, projections2.as_slice())
+            .expect("mapping projections");
+
+        assert!(mapped.is_empty());
+
+        let mapped = adapter
+            .map_projections(&file_schema_2, projections1.as_slice())
+            .expect("mapping projections");
+
+        assert_eq!(mapped, vec![1, 0]);
+
+        let mapped = adapter
+            .map_projections(&file_schema_2, projections2.as_slice())
+            .expect("mapping projections");
+
+        assert_eq!(mapped, vec![0]);
+
+        let mapped = adapter.map_projections(&file_schema_3, 
projections1.as_slice());
+
+        assert!(mapped.is_err());
+    }
+
     // sets default for configs that play no role in projections
     fn config_for_projection(
         file_schema: SchemaRef,
diff --git a/datafusion/src/physical_plan/file_format/parquet.rs 
b/datafusion/src/physical_plan/file_format/parquet.rs
index 905bb1e..40acf5a 100644
--- a/datafusion/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/src/physical_plan/file_format/parquet.rs
@@ -44,14 +44,13 @@ use arrow::{
     error::{ArrowError, Result as ArrowResult},
     record_batch::RecordBatch,
 };
-use log::{debug, info};
+use log::debug;
 use parquet::file::{
     metadata::RowGroupMetaData,
     reader::{FileReader, SerializedFileReader},
     statistics::Statistics as ParquetStatistics,
 };
 
-use arrow::array::new_null_array;
 use fmt::Debug;
 use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
 
@@ -61,6 +60,7 @@ use tokio::{
 };
 
 use crate::execution::runtime_env::RuntimeEnv;
+use crate::physical_plan::file_format::SchemaAdapter;
 use async_trait::async_trait;
 
 use super::PartitionColumnProjector;
@@ -215,11 +215,12 @@ impl ExecutionPlan for ParquetExec {
             &self.base_config.table_partition_cols,
         );
 
-        let file_schema_ref = self.base_config().file_schema.clone();
+        let adapter = SchemaAdapter::new(self.base_config.file_schema.clone());
+
         let join_handle = task::spawn_blocking(move || {
             if let Err(e) = read_partition(
                 object_store.as_ref(),
-                file_schema_ref,
+                adapter,
                 partition_index,
                 &partition,
                 metrics,
@@ -420,33 +421,10 @@ fn build_row_group_predicate(
     }
 }
 
-// Map projections from the schema which merges all file schemas to 
projections on a particular
-// file
-fn map_projections(
-    merged_schema: &Schema,
-    file_schema: &Schema,
-    projections: &[usize],
-) -> Result<Vec<usize>> {
-    let mut mapped: Vec<usize> = vec![];
-    for idx in projections {
-        let field = merged_schema.field(*idx);
-        if let Ok(mapped_idx) = file_schema.index_of(field.name().as_str()) {
-            if file_schema.field(mapped_idx).data_type() == field.data_type() {
-                mapped.push(mapped_idx)
-            } else {
-                let msg = format!("Failed to map column projection for field 
{}. Incompatible data types {:?} and {:?}", field.name(), 
file_schema.field(mapped_idx).data_type(), field.data_type());
-                info!("{}", msg);
-                return Err(DataFusionError::Execution(msg));
-            }
-        }
-    }
-    Ok(mapped)
-}
-
 #[allow(clippy::too_many_arguments)]
 fn read_partition(
     object_store: &dyn ObjectStore,
-    file_schema: SchemaRef,
+    schema_adapter: SchemaAdapter,
     partition_index: usize,
     partition: &[PartitionedFile],
     metrics: ExecutionPlanMetricsSet,
@@ -480,44 +458,20 @@ fn read_partition(
         }
 
         let mut arrow_reader = 
ParquetFileArrowReader::new(Arc::new(file_reader));
-        let mapped_projections =
-            map_projections(&file_schema, &arrow_reader.get_schema()?, 
projection)?;
+        let adapted_projections =
+            schema_adapter.map_projections(&arrow_reader.get_schema()?, 
projection)?;
 
         let mut batch_reader =
-            arrow_reader.get_record_reader_by_columns(mapped_projections, 
batch_size)?;
+            arrow_reader.get_record_reader_by_columns(adapted_projections, 
batch_size)?;
         loop {
             match batch_reader.next() {
                 Some(Ok(batch)) => {
-                    let total_cols = &file_schema.fields().len();
-                    let batch_rows = batch.num_rows();
                     total_rows += batch.num_rows();
 
-                    let batch_schema = batch.schema();
-
-                    let mut cols: Vec<ArrayRef> = 
Vec::with_capacity(*total_cols);
-                    let batch_cols = batch.columns().to_vec();
-
-                    for field_idx in projection {
-                        let merged_field = &file_schema.fields()[*field_idx];
-                        if let Some((batch_idx, _name)) =
-                            
batch_schema.column_with_name(merged_field.name().as_str())
-                        {
-                            cols.push(batch_cols[batch_idx].clone());
-                        } else {
-                            cols.push(new_null_array(
-                                merged_field.data_type(),
-                                batch_rows,
-                            ))
-                        }
-                    }
-
-                    let projected_schema = 
file_schema.clone().project(projection)?;
-
-                    let merged_batch =
-                        RecordBatch::try_new(Arc::new(projected_schema), 
cols)?;
+                    let adapted_batch = schema_adapter.adapt_batch(batch, 
projection)?;
 
                     let proj_batch = partition_column_projector
-                        .project(merged_batch, 
&partitioned_file.partition_values);
+                        .project(adapted_batch, 
&partitioned_file.partition_values);
 
                     send_result(&response_tx, proj_batch)?;
                     if limit.map(|l| total_rows >= l).unwrap_or(false) {
diff --git a/datafusion/src/test_util.rs b/datafusion/src/test_util.rs
index af66503..8ee0298 100644
--- a/datafusion/src/test_util.rs
+++ b/datafusion/src/test_util.rs
@@ -257,6 +257,32 @@ pub fn aggr_test_schema() -> SchemaRef {
     Arc::new(schema)
 }
 
+/// Get the schema for the aggregate_test_* csv files with an additional filed 
not present in the files.
+pub fn aggr_test_schema_with_missing_col() -> SchemaRef {
+    let mut f1 = Field::new("c1", DataType::Utf8, false);
+    f1.set_metadata(Some(BTreeMap::from_iter(
+        vec![("testing".into(), "test".into())].into_iter(),
+    )));
+    let schema = Schema::new(vec![
+        f1,
+        Field::new("c2", DataType::UInt32, false),
+        Field::new("c3", DataType::Int8, false),
+        Field::new("c4", DataType::Int16, false),
+        Field::new("c5", DataType::Int32, false),
+        Field::new("c6", DataType::Int64, false),
+        Field::new("c7", DataType::UInt8, false),
+        Field::new("c8", DataType::UInt16, false),
+        Field::new("c9", DataType::UInt32, false),
+        Field::new("c10", DataType::UInt64, false),
+        Field::new("c11", DataType::Float32, false),
+        Field::new("c12", DataType::Float64, false),
+        Field::new("c13", DataType::Utf8, false),
+        Field::new("missing_col", DataType::Int64, true),
+    ]);
+
+    Arc::new(schema)
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;

Reply via email to