This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new deaa8ac Handle merging of evolved schemas in ParquetExec (#1622)
deaa8ac is described below
commit deaa8acba785a0233a58ced311e5f092f4744bc1
Author: Dan Harris <[email protected]>
AuthorDate: Sun Jan 23 08:34:49 2022 -0500
Handle merging of evolved schemas in ParquetExec (#1622)
* Handle merging of evolved schemas in ParquetExec
* Handle merging of evolved schemas in ParquetExec
* Linting fix
* Avoid unnecessary search by field name
* Add round trip parquet testing
* PR comments:
1. Add additional test cases
2. Map projected column indexes in all cases
3. Raise an explicit error in the case where there a conflict between a
file schema and the merged schema.
* Remove redundant test case and revert submodule changes
* Linting
* Linting
* Using a info message (as depending on the usecase this may signal a user
error rather than something to investigate) Using a DataFusionError rather than
one from Parquet (the rationale being that this error comes from DataFusion,
and is not related to being able to read the parquet files)
Co-authored-by: Andrew Lamb <[email protected]>
* PR comments: Clarify incompatible schema test
* Linting
* Clippy
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/src/datasource/file_format/parquet.rs | 22 +-
.../src/physical_plan/file_format/parquet.rs | 373 ++++++++++++++++++++-
2 files changed, 374 insertions(+), 21 deletions(-)
diff --git a/datafusion/src/datasource/file_format/parquet.rs
b/datafusion/src/datasource/file_format/parquet.rs
index eedb4c9..4afb2f5 100644
--- a/datafusion/src/datasource/file_format/parquet.rs
+++ b/datafusion/src/datasource/file_format/parquet.rs
@@ -24,7 +24,7 @@ use std::sync::Arc;
use arrow::datatypes::Schema;
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
-use futures::stream::StreamExt;
+use futures::TryStreamExt;
use parquet::arrow::ArrowReader;
use parquet::arrow::ParquetFileArrowReader;
use parquet::errors::ParquetError;
@@ -87,16 +87,15 @@ impl FileFormat for ParquetFormat {
self
}
- async fn infer_schema(&self, mut readers: ObjectReaderStream) ->
Result<SchemaRef> {
- // We currently get the schema information from the first file rather
than do
- // schema merging and this is a limitation.
- // See https://issues.apache.org/jira/browse/ARROW-11017
- let first_file = readers
- .next()
- .await
- .ok_or_else(|| DataFusionError::Plan("No data file
found".to_owned()))??;
- let schema = fetch_schema(first_file)?;
- Ok(Arc::new(schema))
+ async fn infer_schema(&self, readers: ObjectReaderStream) ->
Result<SchemaRef> {
+ let merged_schema = readers
+ .try_fold(Schema::empty(), |acc, reader| async {
+ let next_schema = fetch_schema(reader);
+ Schema::try_merge([acc, next_schema?])
+ .map_err(DataFusionError::ArrowError)
+ })
+ .await?;
+ Ok(Arc::new(merged_schema))
}
async fn infer_stats(&self, reader: Arc<dyn ObjectReader>) ->
Result<Statistics> {
@@ -367,6 +366,7 @@ mod tests {
};
use super::*;
+
use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use arrow::array::{
BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
diff --git a/datafusion/src/physical_plan/file_format/parquet.rs
b/datafusion/src/physical_plan/file_format/parquet.rs
index 5768d0a..d240fe2 100644
--- a/datafusion/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/src/physical_plan/file_format/parquet.rs
@@ -44,13 +44,14 @@ use arrow::{
error::{ArrowError, Result as ArrowResult},
record_batch::RecordBatch,
};
-use log::debug;
+use log::{debug, info};
use parquet::file::{
metadata::RowGroupMetaData,
reader::{FileReader, SerializedFileReader},
statistics::Statistics as ParquetStatistics,
};
+use arrow::array::new_null_array;
use fmt::Debug;
use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
@@ -214,9 +215,11 @@ impl ExecutionPlan for ParquetExec {
&self.base_config.table_partition_cols,
);
+ let file_schema_ref = self.base_config().file_schema.clone();
let join_handle = task::spawn_blocking(move || {
if let Err(e) = read_partition(
object_store.as_ref(),
+ file_schema_ref,
partition_index,
partition,
metrics,
@@ -414,9 +417,33 @@ fn build_row_group_predicate(
}
}
+// Map projections from the schema which merges all file schemas to
projections on a particular
+// file
+fn map_projections(
+ merged_schema: &Schema,
+ file_schema: &Schema,
+ projections: &[usize],
+) -> Result<Vec<usize>> {
+ let mut mapped: Vec<usize> = vec![];
+ for idx in projections {
+ let field = merged_schema.field(*idx);
+ if let Ok(mapped_idx) = file_schema.index_of(field.name().as_str()) {
+ if file_schema.field(mapped_idx).data_type() == field.data_type() {
+ mapped.push(mapped_idx)
+ } else {
+ let msg = format!("Failed to map column projection for field
{}. Incompatible data types {:?} and {:?}", field.name(),
file_schema.field(mapped_idx).data_type(), field.data_type());
+ info!("{}", msg);
+ return Err(DataFusionError::Execution(msg));
+ }
+ }
+ }
+ Ok(mapped)
+}
+
#[allow(clippy::too_many_arguments)]
fn read_partition(
object_store: &dyn ObjectStore,
+ file_schema: SchemaRef,
partition_index: usize,
partition: Vec<PartitionedFile>,
metrics: ExecutionPlanMetricsSet,
@@ -429,6 +456,8 @@ fn read_partition(
) -> Result<()> {
let mut total_rows = 0;
'outer: for partitioned_file in partition {
+ debug!("Reading file {}", &partitioned_file.file_meta.path());
+
let file_metrics = ParquetFileMetrics::new(
partition_index,
&*partitioned_file.file_meta.path(),
@@ -446,15 +475,46 @@ fn read_partition(
);
file_reader.filter_row_groups(&row_group_predicate);
}
+
let mut arrow_reader =
ParquetFileArrowReader::new(Arc::new(file_reader));
- let mut batch_reader = arrow_reader
- .get_record_reader_by_columns(projection.to_owned(), batch_size)?;
+ let mapped_projections =
+ map_projections(&file_schema, &arrow_reader.get_schema()?,
projection)?;
+
+ let mut batch_reader =
+ arrow_reader.get_record_reader_by_columns(mapped_projections,
batch_size)?;
loop {
match batch_reader.next() {
Some(Ok(batch)) => {
+ let total_cols = &file_schema.fields().len();
+ let batch_rows = batch.num_rows();
total_rows += batch.num_rows();
+
+ let batch_schema = batch.schema();
+
+ let mut cols: Vec<ArrayRef> =
Vec::with_capacity(*total_cols);
+ let batch_cols = batch.columns().to_vec();
+
+ for field_idx in projection {
+ let merged_field = &file_schema.fields()[*field_idx];
+ if let Some((batch_idx, _name)) =
+
batch_schema.column_with_name(merged_field.name().as_str())
+ {
+ cols.push(batch_cols[batch_idx].clone());
+ } else {
+ cols.push(new_null_array(
+ merged_field.data_type(),
+ batch_rows,
+ ))
+ }
+ }
+
+ let projected_schema =
file_schema.clone().project(projection)?;
+
+ let merged_batch =
+ RecordBatch::try_new(Arc::new(projected_schema),
cols)?;
+
let proj_batch = partition_column_projector
- .project(batch, &partitioned_file.partition_values);
+ .project(merged_batch,
&partitioned_file.partition_values);
send_result(&response_tx, proj_batch)?;
if limit.map(|l| total_rows >= l).unwrap_or(false) {
@@ -486,22 +546,315 @@ fn read_partition(
#[cfg(test)]
mod tests {
- use crate::datasource::{
- file_format::{parquet::ParquetFormat, FileFormat},
- object_store::local::{
- local_object_reader_stream, local_unpartitioned_file,
LocalFileSystem,
+ use crate::{
+ assert_batches_sorted_eq,
+ datasource::{
+ file_format::{parquet::ParquetFormat, FileFormat},
+ object_store::local::{
+ local_object_reader_stream, local_unpartitioned_file,
LocalFileSystem,
+ },
},
+ physical_plan::collect,
};
use super::*;
- use arrow::datatypes::{DataType, Field};
+ use arrow::array::Float32Array;
+ use arrow::{
+ array::{Int64Array, Int8Array, StringArray},
+ datatypes::{DataType, Field},
+ };
use futures::StreamExt;
use parquet::{
+ arrow::ArrowWriter,
basic::Type as PhysicalType,
- file::{metadata::RowGroupMetaData, statistics::Statistics as
ParquetStatistics},
+ file::{
+ metadata::RowGroupMetaData, properties::WriterProperties,
+ statistics::Statistics as ParquetStatistics,
+ },
schema::types::SchemaDescPtr,
};
+ /// writes each RecordBatch as an individual parquet file and then
+ /// reads it back in to the named location.
+ async fn round_trip_to_parquet(
+ batches: Vec<RecordBatch>,
+ projection: Option<Vec<usize>>,
+ schema: Option<SchemaRef>,
+ ) -> Vec<RecordBatch> {
+ // When vec is dropped, temp files are deleted
+ let files: Vec<_> = batches
+ .into_iter()
+ .map(|batch| {
+ let output = tempfile::NamedTempFile::new().expect("creating
temp file");
+
+ let props = WriterProperties::builder().build();
+ let file: std::fs::File = (*output.as_file())
+ .try_clone()
+ .expect("cloning file descriptor");
+ let mut writer = ArrowWriter::try_new(file, batch.schema(),
Some(props))
+ .expect("creating writer");
+
+ writer.write(&batch).expect("Writing batch");
+ writer.close().unwrap();
+ output
+ })
+ .collect();
+
+ let file_names: Vec<_> = files
+ .iter()
+ .map(|t| t.path().to_string_lossy().to_string())
+ .collect();
+
+ // Now, read the files back in
+ let file_groups: Vec<_> = file_names
+ .iter()
+ .map(|name| local_unpartitioned_file(name.clone()))
+ .collect();
+
+ // Infer the schema (if not provided)
+ let file_schema = match schema {
+ Some(provided_schema) => provided_schema,
+ None => ParquetFormat::default()
+ .infer_schema(local_object_reader_stream(file_names))
+ .await
+ .expect("inferring schema"),
+ };
+
+ // prepare the scan
+ let parquet_exec = ParquetExec::new(
+ FileScanConfig {
+ object_store: Arc::new(LocalFileSystem {}),
+ file_groups: vec![file_groups],
+ file_schema,
+ statistics: Statistics::default(),
+ projection,
+ limit: None,
+ table_partition_cols: vec![],
+ },
+ None,
+ );
+
+ let runtime = Arc::new(RuntimeEnv::default());
+ collect(Arc::new(parquet_exec), runtime)
+ .await
+ .expect("reading parquet data")
+ }
+
+ // Add a new column with the specified field name to the RecordBatch
+ fn add_to_batch(
+ batch: &RecordBatch,
+ field_name: &str,
+ array: ArrayRef,
+ ) -> RecordBatch {
+ let mut fields = batch.schema().fields().clone();
+ fields.push(Field::new(field_name, array.data_type().clone(), true));
+ let schema = Arc::new(Schema::new(fields));
+
+ let mut columns = batch.columns().to_vec();
+ columns.push(array);
+ RecordBatch::try_new(schema, columns).expect("error; creating record
batch")
+ }
+
+ fn create_batch(columns: Vec<(&str, ArrayRef)>) -> RecordBatch {
+ columns.into_iter().fold(
+ RecordBatch::new_empty(Arc::new(Schema::new(vec![]))),
+ |batch, (field_name, arr)| add_to_batch(&batch, field_name,
arr.clone()),
+ )
+ }
+
+ #[tokio::test]
+ async fn evolved_schema() {
+ let c1: ArrayRef =
+ Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+ // batch1: c1(string)
+ let batch1 = add_to_batch(
+ &RecordBatch::new_empty(Arc::new(Schema::new(vec![]))),
+ "c1",
+ c1,
+ );
+
+ // batch2: c1(string) and c2(int64)
+ let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2),
None]));
+ let batch2 = add_to_batch(&batch1, "c2", c2);
+
+ // batch3: c1(string) and c3(int8)
+ let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20),
None]));
+ let batch3 = add_to_batch(&batch1, "c3", c3);
+
+ // read/write them files:
+ let read = round_trip_to_parquet(vec![batch1, batch2, batch3], None,
None).await;
+ let expected = vec![
+ "+-----+----+----+",
+ "| c1 | c2 | c3 |",
+ "+-----+----+----+",
+ "| | | |",
+ "| | | 20 |",
+ "| | 2 | |",
+ "| Foo | | |",
+ "| Foo | | 10 |",
+ "| Foo | 1 | |",
+ "| bar | | |",
+ "| bar | | |",
+ "| bar | | |",
+ "+-----+----+----+",
+ ];
+ assert_batches_sorted_eq!(expected, &read);
+ }
+
+ #[tokio::test]
+ async fn evolved_schema_inconsistent_order() {
+ let c1: ArrayRef =
+ Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+ let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2),
None]));
+
+ let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20),
None]));
+
+ // batch1: c1(string), c2(int64), c3(int8)
+ let batch1 = create_batch(vec![
+ ("c1", c1.clone()),
+ ("c2", c2.clone()),
+ ("c3", c3.clone()),
+ ]);
+
+ // batch2: c3(int8), c2(int64), c1(string)
+ let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1)]);
+
+ // read/write them files:
+ let read = round_trip_to_parquet(vec![batch1, batch2], None,
None).await;
+ let expected = vec![
+ "+-----+----+----+",
+ "| c1 | c2 | c3 |",
+ "+-----+----+----+",
+ "| Foo | 1 | 10 |",
+ "| | 2 | 20 |",
+ "| bar | | |",
+ "| Foo | 1 | 10 |",
+ "| | 2 | 20 |",
+ "| bar | | |",
+ "+-----+----+----+",
+ ];
+ assert_batches_sorted_eq!(expected, &read);
+ }
+
+ #[tokio::test]
+ async fn evolved_schema_intersection() {
+ let c1: ArrayRef =
+ Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+ let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2),
None]));
+
+ let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20),
None]));
+
+ // batch1: c1(string), c2(int64), c3(int8)
+ let batch1 = create_batch(vec![("c1", c1), ("c3", c3.clone())]);
+
+ // batch2: c3(int8), c2(int64), c1(string)
+ let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]);
+
+ // read/write them files:
+ let read = round_trip_to_parquet(vec![batch1, batch2], None,
None).await;
+ let expected = vec![
+ "+-----+----+----+",
+ "| c1 | c3 | c2 |",
+ "+-----+----+----+",
+ "| Foo | 10 | |",
+ "| | 20 | |",
+ "| bar | | |",
+ "| | 10 | 1 |",
+ "| | 20 | 2 |",
+ "| | | |",
+ "+-----+----+----+",
+ ];
+ assert_batches_sorted_eq!(expected, &read);
+ }
+
+ #[tokio::test]
+ async fn evolved_schema_projection() {
+ let c1: ArrayRef =
+ Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+ let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2),
None]));
+
+ let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20),
None]));
+
+ let c4: ArrayRef =
+ Arc::new(StringArray::from(vec![Some("baz"), Some("boo"), None]));
+
+ // batch1: c1(string), c2(int64), c3(int8)
+ let batch1 = create_batch(vec![
+ ("c1", c1.clone()),
+ ("c2", c2.clone()),
+ ("c3", c3.clone()),
+ ]);
+
+ // batch2: c3(int8), c2(int64), c1(string), c4(string)
+ let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1),
("c4", c4)]);
+
+ // read/write them files:
+ let read =
+ round_trip_to_parquet(vec![batch1, batch2], Some(vec![0, 3]),
None).await;
+ let expected = vec![
+ "+-----+-----+",
+ "| c1 | c4 |",
+ "+-----+-----+",
+ "| Foo | baz |",
+ "| | boo |",
+ "| bar | |",
+ "| Foo | |",
+ "| | |",
+ "| bar | |",
+ "+-----+-----+",
+ ];
+ assert_batches_sorted_eq!(expected, &read);
+ }
+
+ #[tokio::test]
+ async fn evolved_schema_incompatible_types() {
+ let c1: ArrayRef =
+ Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+ let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2),
None]));
+
+ let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20),
None]));
+
+ let c4: ArrayRef =
+ Arc::new(Float32Array::from(vec![Some(1.0_f32), Some(2.0_f32),
None]));
+
+ // batch1: c1(string), c2(int64), c3(int8)
+ let batch1 = create_batch(vec![
+ ("c1", c1.clone()),
+ ("c2", c2.clone()),
+ ("c3", c3.clone()),
+ ]);
+
+ // batch2: c3(int8), c2(int64), c1(string), c4(string)
+ let batch2 = create_batch(vec![("c3", c4), ("c2", c2), ("c1", c1)]);
+
+ let schema = Schema::new(vec![
+ Field::new("c1", DataType::Utf8, true),
+ Field::new("c2", DataType::Int64, true),
+ Field::new("c3", DataType::Int8, true),
+ ]);
+
+ // read/write them files:
+ let read =
+ round_trip_to_parquet(vec![batch1, batch2], None,
Some(Arc::new(schema)))
+ .await;
+
+ // expect only the first batch to be read
+ let expected = vec![
+ "+-----+----+----+",
+ "| c1 | c2 | c3 |",
+ "+-----+----+----+",
+ "| Foo | 1 | 10 |",
+ "| | 2 | 20 |",
+ "| bar | | |",
+ "+-----+----+----+",
+ ];
+ assert_batches_sorted_eq!(expected, &read);
+ }
+
#[tokio::test]
async fn parquet_exec_with_projection() -> Result<()> {
let runtime = Arc::new(RuntimeEnv::default());