This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 7bec762 Create SchemaAdapter trait to map table schema to file
schemas (#1709)
7bec762 is described below
commit 7bec762d1f1ebef4801af2eefd7a5033c474fe77
Author: Dan Harris <[email protected]>
AuthorDate: Mon Jan 31 15:41:21 2022 -0500
Create SchemaAdapter trait to map table schema to file schemas (#1709)
* Create SchemaAdapter trait to map table schema to file schemas
* Linting fix
* Remove commented code
---
datafusion/src/physical_plan/file_format/avro.rs | 64 +++++++++-
datafusion/src/physical_plan/file_format/csv.rs | 47 +++++++
datafusion/src/physical_plan/file_format/json.rs | 43 +++++++
datafusion/src/physical_plan/file_format/mod.rs | 140 +++++++++++++++++++++
.../src/physical_plan/file_format/parquet.rs | 68 ++--------
datafusion/src/test_util.rs | 26 ++++
6 files changed, 330 insertions(+), 58 deletions(-)
diff --git a/datafusion/src/physical_plan/file_format/avro.rs
b/datafusion/src/physical_plan/file_format/avro.rs
index 9de8ffd..b389686 100644
--- a/datafusion/src/physical_plan/file_format/avro.rs
+++ b/datafusion/src/physical_plan/file_format/avro.rs
@@ -165,13 +165,14 @@ impl ExecutionPlan for AvroExec {
#[cfg(test)]
#[cfg(feature = "avro")]
mod tests {
-
use crate::datasource::file_format::{avro::AvroFormat, FileFormat};
use crate::datasource::object_store::local::{
local_object_reader_stream, local_unpartitioned_file, LocalFileSystem,
};
use crate::scalar::ScalarValue;
+ use arrow::datatypes::{DataType, Field, Schema};
use futures::StreamExt;
+ use sqlparser::ast::ObjectType::Schema;
use super::*;
@@ -229,6 +230,67 @@ mod tests {
}
#[tokio::test]
+ async fn avro_exec_missing_column() -> Result<()> {
+ let testdata = crate::test_util::arrow_test_data();
+ let filename = format!("{}/avro/alltypes_plain.avro", testdata);
+ let actual_schema = AvroFormat {}
+ .infer_schema(local_object_reader_stream(vec![filename]))
+ .await?;
+
+ let mut fields = actual_schema.fields().clone();
+ fields.push(Field::new("missing_col", DataType::Int32, true));
+
+ let file_schema = Arc::new(Schema::new(fields));
+
+ let avro_exec = AvroExec::new(FileScanConfig {
+ object_store: Arc::new(LocalFileSystem {}),
+ file_groups:
vec![vec![local_unpartitioned_file(filename.clone())]],
+ file_schema,
+ statistics: Statistics::default(),
+ // Include the missing column in the projection
+ projection: Some(vec![0, 1, 2, file_schema.fields().len()]),
+ limit: None,
+ table_partition_cols: vec![],
+ });
+ assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
+
+ let mut results = avro_exec.execute(0).await.expect("plan execution
failed");
+ let batch = results
+ .next()
+ .await
+ .expect("plan iterator empty")
+ .expect("plan iterator returned an error");
+
+ let expected = vec![
+ "+----+----------+-------------+-------------+",
+ "| id | bool_col | tinyint_col | missing_col |",
+ "+----+----------+-------------+-------------+",
+ "| 4 | true | 0 | |",
+ "| 5 | false | 1 | |",
+ "| 6 | true | 0 | |",
+ "| 7 | false | 1 | |",
+ "| 2 | true | 0 | |",
+ "| 3 | false | 1 | |",
+ "| 0 | true | 0 | |",
+ "| 1 | false | 1 | |",
+ "+----+----------+-------------+-------------+",
+ ];
+
+ crate::assert_batches_eq!(expected, &[batch]);
+
+ let batch = results.next().await;
+ assert!(batch.is_none());
+
+ let batch = results.next().await;
+ assert!(batch.is_none());
+
+ let batch = results.next().await;
+ assert!(batch.is_none());
+
+ Ok(())
+ }
+
+ #[tokio::test]
async fn avro_exec_with_partition() -> Result<()> {
let testdata = crate::test_util::arrow_test_data();
let filename = format!("{}/avro/alltypes_plain.avro", testdata);
diff --git a/datafusion/src/physical_plan/file_format/csv.rs
b/datafusion/src/physical_plan/file_format/csv.rs
index 5cff3b6..4cf70f6 100644
--- a/datafusion/src/physical_plan/file_format/csv.rs
+++ b/datafusion/src/physical_plan/file_format/csv.rs
@@ -170,6 +170,7 @@ impl ExecutionPlan for CsvExec {
#[cfg(test)]
mod tests {
use super::*;
+ use crate::test_util::aggr_test_schema_with_missing_col;
use crate::{
datasource::object_store::local::{local_unpartitioned_file,
LocalFileSystem},
scalar::ScalarValue,
@@ -270,6 +271,52 @@ mod tests {
}
#[tokio::test]
+ async fn csv_exec_with_missing_column() -> Result<()> {
+ let runtime = Arc::new(RuntimeEnv::default());
+ let file_schema = aggr_test_schema_with_missing_col();
+ let testdata = crate::test_util::arrow_test_data();
+ let filename = "aggregate_test_100.csv";
+ let path = format!("{}/csv/{}", testdata, filename);
+ let csv = CsvExec::new(
+ FileScanConfig {
+ object_store: Arc::new(LocalFileSystem {}),
+ file_schema,
+ file_groups: vec![vec![local_unpartitioned_file(path)]],
+ statistics: Statistics::default(),
+ projection: None,
+ limit: Some(5),
+ table_partition_cols: vec![],
+ },
+ true,
+ b',',
+ );
+ assert_eq!(14, csv.base_config.file_schema.fields().len());
+ assert_eq!(14, csv.projected_schema.fields().len());
+ assert_eq!(14, csv.schema().fields().len());
+
+ let mut it = csv.execute(0, runtime).await?;
+ let batch = it.next().await.unwrap()?;
+ assert_eq!(14, batch.num_columns());
+ assert_eq!(5, batch.num_rows());
+
+ let expected = vec![
+
"+----+----+-----+--------+------------+----------------------+-----+-------+------------+----------------------+-------------+---------------------+--------------------------------+-------------+",
+ "| c1 | c2 | c3 | c4 | c5 | c6 | c7
| c8 | c9 | c10 | c11 | c12
| c13 | missing_col |",
+
"+----+----+-----+--------+------------+----------------------+-----+-------+------------+----------------------+-------------+---------------------+--------------------------------+-------------+",
+ "| c | 2 | 1 | 18109 | 2033001162 | -6513304855495910254 | 25
| 43062 | 1491205016 | 5863949479783605708 | 0.110830784 | 0.9294097332465232
| 6WfVFBVGJSQb7FhA7E0lBwdvjfZnSW | |",
+ "| d | 5 | -40 | 22614 | 706441268 | -7542719935673075327 |
155 | 14337 | 3373581039 | 11720144131976083864 | 0.69632107 |
0.3114712539863804 | C2GT5KVyOPZpgKVl110TyZO0NcJ434 | |",
+ "| b | 1 | 29 | -18218 | 994303988 | 5983957848665088916 |
204 | 9489 | 3275293996 | 14857091259186476033 | 0.53840446 |
0.17909035118828576 | AyYVExXK6AR2qUTxNZ7qRHQOVGMLcz | |",
+ "| a | 1 | -85 | -15154 | 1171968280 | 1919439543497968449 | 77
| 52286 | 774637006 | 12101411955859039553 | 0.12285209 | 0.6864391962767343
| 0keZ5G8BffGwgF2RwQD59TFzMStxCB | |",
+ "| b | 5 | -82 | 22080 | 1824882165 | 7373730676428214987 |
208 | 34331 | 3342719438 | 3330177516592499461 | 0.82634634 |
0.40975383525297016 | Ig1QcuKsjHXkproePdERo2w0mYzIqd | |",
+
"+----+----+-----+--------+------------+----------------------+-----+-------+------------+----------------------+-------------+---------------------+--------------------------------+-------------+",
+ ];
+
+ crate::assert_batches_eq!(expected, &[batch]);
+
+ Ok(())
+ }
+
+ #[tokio::test]
async fn csv_exec_with_partition() -> Result<()> {
let runtime = Arc::new(RuntimeEnv::default());
let file_schema = aggr_test_schema();
diff --git a/datafusion/src/physical_plan/file_format/json.rs
b/datafusion/src/physical_plan/file_format/json.rs
index 0fc95d1..ac41306 100644
--- a/datafusion/src/physical_plan/file_format/json.rs
+++ b/datafusion/src/physical_plan/file_format/json.rs
@@ -137,6 +137,8 @@ impl ExecutionPlan for NdJsonExec {
#[cfg(test)]
mod tests {
+ use arrow::array::Array;
+ use arrow::datatypes::{Field, Schema};
use futures::StreamExt;
use crate::datasource::{
@@ -212,6 +214,47 @@ mod tests {
}
#[tokio::test]
+ async fn nd_json_exec_file_with_missing_column() -> Result<()> {
+ let runtime = Arc::new(RuntimeEnv::default());
+ use arrow::datatypes::DataType;
+ let path = format!("{}/1.json", TEST_DATA_BASE);
+
+ let actual_schema = infer_schema(path.clone()).await?;
+
+ let mut fields = actual_schema.fields().clone();
+ fields.push(Field::new("missing_col", DataType::Int32, true));
+ let missing_field_idx = fields.len() - 1;
+
+ let file_schema = Arc::new(Schema::new(fields));
+
+ let exec = NdJsonExec::new(FileScanConfig {
+ object_store: Arc::new(LocalFileSystem {}),
+ file_groups: vec![vec![local_unpartitioned_file(path.clone())]],
+ file_schema,
+ statistics: Statistics::default(),
+ projection: None,
+ limit: Some(3),
+ table_partition_cols: vec![],
+ });
+
+ let mut it = exec.execute(0, runtime).await?;
+ let batch = it.next().await.unwrap()?;
+
+ assert_eq!(batch.num_rows(), 3);
+ let values = batch
+ .column(missing_field_idx)
+ .as_any()
+ .downcast_ref::<arrow::array::Int32Array>()
+ .unwrap();
+ assert_eq!(values.len(), 3);
+ assert!(values.is_null(0));
+ assert!(values.is_null(1));
+ assert!(values.is_null(2));
+
+ Ok(())
+ }
+
+ #[tokio::test]
async fn nd_json_exec_file_projection() -> Result<()> {
let runtime = Arc::new(RuntimeEnv::default());
let path = format!("{}/1.json", TEST_DATA_BASE);
diff --git a/datafusion/src/physical_plan/file_format/mod.rs
b/datafusion/src/physical_plan/file_format/mod.rs
index b655cdb..7658add 100644
--- a/datafusion/src/physical_plan/file_format/mod.rs
+++ b/datafusion/src/physical_plan/file_format/mod.rs
@@ -35,11 +35,15 @@ pub use avro::AvroExec;
pub use csv::CsvExec;
pub use json::NdJsonExec;
+use crate::error::DataFusionError;
use crate::{
datasource::{object_store::ObjectStore, PartitionedFile},
+ error::Result,
scalar::ScalarValue,
};
+use arrow::array::new_null_array;
use lazy_static::lazy_static;
+use log::info;
use std::{
collections::HashMap,
fmt::{Display, Formatter, Result as FmtResult},
@@ -165,6 +169,87 @@ impl<'a> Display for FileGroupsDisplay<'a> {
}
}
+/// A utility which can adapt file-level record batches to a table schema
which may have a schema
+/// obtained from merging multiple file-level schemas.
+///
+/// This is useful for enabling schema evolution in partitioned datasets.
+///
+/// This has to be done in two stages.
+///
+/// 1. Before reading the file, we have to map projected column indexes from
the table schema to
+/// the file schema.
+///
+/// 2. After reading a record batch we need to map the read columns back to
the expected columns
+/// indexes and insert null-valued columns wherever the file schema was
missing a colum present
+/// in the table schema.
+#[derive(Clone, Debug)]
+pub(crate) struct SchemaAdapter {
+ /// Schema for the table
+ table_schema: SchemaRef,
+}
+
+impl SchemaAdapter {
+ pub(crate) fn new(table_schema: SchemaRef) -> SchemaAdapter {
+ Self { table_schema }
+ }
+
+ /// Map projected column indexes to the file schema. This will fail if the
table schema
+ /// and the file schema contain a field with the same name and different
types.
+ pub fn map_projections(
+ &self,
+ file_schema: &Schema,
+ projections: &[usize],
+ ) -> Result<Vec<usize>> {
+ let mut mapped: Vec<usize> = vec![];
+ for idx in projections {
+ let field = self.table_schema.field(*idx);
+ if let Ok(mapped_idx) =
file_schema.index_of(field.name().as_str()) {
+ if file_schema.field(mapped_idx).data_type() ==
field.data_type() {
+ mapped.push(mapped_idx)
+ } else {
+ let msg = format!("Failed to map column projection for
field {}. Incompatible data types {:?} and {:?}", field.name(),
file_schema.field(mapped_idx).data_type(), field.data_type());
+ info!("{}", msg);
+ return Err(DataFusionError::Execution(msg));
+ }
+ }
+ }
+ Ok(mapped)
+ }
+
+ /// Re-order projected columns by index in record batch to match table
schema column ordering. If the record
+ /// batch does not contain a column for an expected field, insert a
null-valued column at the
+ /// required column index.
+ pub fn adapt_batch(
+ &self,
+ batch: RecordBatch,
+ projections: &[usize],
+ ) -> Result<RecordBatch> {
+ let batch_rows = batch.num_rows();
+
+ let batch_schema = batch.schema();
+
+ let mut cols: Vec<ArrayRef> =
Vec::with_capacity(batch.columns().len());
+ let batch_cols = batch.columns().to_vec();
+
+ for field_idx in projections {
+ let table_field = &self.table_schema.fields()[*field_idx];
+ if let Some((batch_idx, _name)) =
+ batch_schema.column_with_name(table_field.name().as_str())
+ {
+ cols.push(batch_cols[batch_idx].clone());
+ } else {
+ cols.push(new_null_array(table_field.data_type(), batch_rows))
+ }
+ }
+
+ let projected_schema =
Arc::new(self.table_schema.clone().project(projections)?);
+
+ let merged_batch = RecordBatch::try_new(projected_schema, cols)?;
+
+ Ok(merged_batch)
+ }
+}
+
/// A helper that projects partition columns into the file record batches.
///
/// One interesting trick is the usage of a cache for the key buffers of the
partition column
@@ -467,6 +552,61 @@ mod tests {
crate::assert_batches_eq!(expected, &[projected_batch]);
}
+ #[test]
+ fn schema_adapter_adapt_projections() {
+ let table_schema = Arc::new(Schema::new(vec![
+ Field::new("c1", DataType::Utf8, true),
+ Field::new("c2", DataType::Int64, true),
+ Field::new("c3", DataType::Int8, true),
+ ]));
+
+ let file_schema = Schema::new(vec![
+ Field::new("c1", DataType::Utf8, true),
+ Field::new("c2", DataType::Int64, true),
+ ]);
+
+ let file_schema_2 = Arc::new(Schema::new(vec![
+ Field::new("c3", DataType::Int8, true),
+ Field::new("c2", DataType::Int64, true),
+ ]));
+
+ let file_schema_3 =
+ Arc::new(Schema::new(vec![Field::new("c3", DataType::Float32,
true)]));
+
+ let adapter = SchemaAdapter::new(table_schema);
+
+ let projections1: Vec<usize> = vec![0, 1, 2];
+ let projections2: Vec<usize> = vec![2];
+
+ let mapped = adapter
+ .map_projections(&file_schema, projections1.as_slice())
+ .expect("mapping projections");
+
+ assert_eq!(mapped, vec![0, 1]);
+
+ let mapped = adapter
+ .map_projections(&file_schema, projections2.as_slice())
+ .expect("mapping projections");
+
+ assert!(mapped.is_empty());
+
+ let mapped = adapter
+ .map_projections(&file_schema_2, projections1.as_slice())
+ .expect("mapping projections");
+
+ assert_eq!(mapped, vec![1, 0]);
+
+ let mapped = adapter
+ .map_projections(&file_schema_2, projections2.as_slice())
+ .expect("mapping projections");
+
+ assert_eq!(mapped, vec![0]);
+
+ let mapped = adapter.map_projections(&file_schema_3,
projections1.as_slice());
+
+ assert!(mapped.is_err());
+ }
+
// sets default for configs that play no role in projections
fn config_for_projection(
file_schema: SchemaRef,
diff --git a/datafusion/src/physical_plan/file_format/parquet.rs
b/datafusion/src/physical_plan/file_format/parquet.rs
index 905bb1e..40acf5a 100644
--- a/datafusion/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/src/physical_plan/file_format/parquet.rs
@@ -44,14 +44,13 @@ use arrow::{
error::{ArrowError, Result as ArrowResult},
record_batch::RecordBatch,
};
-use log::{debug, info};
+use log::debug;
use parquet::file::{
metadata::RowGroupMetaData,
reader::{FileReader, SerializedFileReader},
statistics::Statistics as ParquetStatistics,
};
-use arrow::array::new_null_array;
use fmt::Debug;
use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
@@ -61,6 +60,7 @@ use tokio::{
};
use crate::execution::runtime_env::RuntimeEnv;
+use crate::physical_plan::file_format::SchemaAdapter;
use async_trait::async_trait;
use super::PartitionColumnProjector;
@@ -215,11 +215,12 @@ impl ExecutionPlan for ParquetExec {
&self.base_config.table_partition_cols,
);
- let file_schema_ref = self.base_config().file_schema.clone();
+ let adapter = SchemaAdapter::new(self.base_config.file_schema.clone());
+
let join_handle = task::spawn_blocking(move || {
if let Err(e) = read_partition(
object_store.as_ref(),
- file_schema_ref,
+ adapter,
partition_index,
&partition,
metrics,
@@ -420,33 +421,10 @@ fn build_row_group_predicate(
}
}
-// Map projections from the schema which merges all file schemas to
projections on a particular
-// file
-fn map_projections(
- merged_schema: &Schema,
- file_schema: &Schema,
- projections: &[usize],
-) -> Result<Vec<usize>> {
- let mut mapped: Vec<usize> = vec![];
- for idx in projections {
- let field = merged_schema.field(*idx);
- if let Ok(mapped_idx) = file_schema.index_of(field.name().as_str()) {
- if file_schema.field(mapped_idx).data_type() == field.data_type() {
- mapped.push(mapped_idx)
- } else {
- let msg = format!("Failed to map column projection for field
{}. Incompatible data types {:?} and {:?}", field.name(),
file_schema.field(mapped_idx).data_type(), field.data_type());
- info!("{}", msg);
- return Err(DataFusionError::Execution(msg));
- }
- }
- }
- Ok(mapped)
-}
-
#[allow(clippy::too_many_arguments)]
fn read_partition(
object_store: &dyn ObjectStore,
- file_schema: SchemaRef,
+ schema_adapter: SchemaAdapter,
partition_index: usize,
partition: &[PartitionedFile],
metrics: ExecutionPlanMetricsSet,
@@ -480,44 +458,20 @@ fn read_partition(
}
let mut arrow_reader =
ParquetFileArrowReader::new(Arc::new(file_reader));
- let mapped_projections =
- map_projections(&file_schema, &arrow_reader.get_schema()?,
projection)?;
+ let adapted_projections =
+ schema_adapter.map_projections(&arrow_reader.get_schema()?,
projection)?;
let mut batch_reader =
- arrow_reader.get_record_reader_by_columns(mapped_projections,
batch_size)?;
+ arrow_reader.get_record_reader_by_columns(adapted_projections,
batch_size)?;
loop {
match batch_reader.next() {
Some(Ok(batch)) => {
- let total_cols = &file_schema.fields().len();
- let batch_rows = batch.num_rows();
total_rows += batch.num_rows();
- let batch_schema = batch.schema();
-
- let mut cols: Vec<ArrayRef> =
Vec::with_capacity(*total_cols);
- let batch_cols = batch.columns().to_vec();
-
- for field_idx in projection {
- let merged_field = &file_schema.fields()[*field_idx];
- if let Some((batch_idx, _name)) =
-
batch_schema.column_with_name(merged_field.name().as_str())
- {
- cols.push(batch_cols[batch_idx].clone());
- } else {
- cols.push(new_null_array(
- merged_field.data_type(),
- batch_rows,
- ))
- }
- }
-
- let projected_schema =
file_schema.clone().project(projection)?;
-
- let merged_batch =
- RecordBatch::try_new(Arc::new(projected_schema),
cols)?;
+ let adapted_batch = schema_adapter.adapt_batch(batch,
projection)?;
let proj_batch = partition_column_projector
- .project(merged_batch,
&partitioned_file.partition_values);
+ .project(adapted_batch,
&partitioned_file.partition_values);
send_result(&response_tx, proj_batch)?;
if limit.map(|l| total_rows >= l).unwrap_or(false) {
diff --git a/datafusion/src/test_util.rs b/datafusion/src/test_util.rs
index af66503..8ee0298 100644
--- a/datafusion/src/test_util.rs
+++ b/datafusion/src/test_util.rs
@@ -257,6 +257,32 @@ pub fn aggr_test_schema() -> SchemaRef {
Arc::new(schema)
}
+/// Get the schema for the aggregate_test_* csv files with an additional filed
not present in the files.
+pub fn aggr_test_schema_with_missing_col() -> SchemaRef {
+ let mut f1 = Field::new("c1", DataType::Utf8, false);
+ f1.set_metadata(Some(BTreeMap::from_iter(
+ vec![("testing".into(), "test".into())].into_iter(),
+ )));
+ let schema = Schema::new(vec![
+ f1,
+ Field::new("c2", DataType::UInt32, false),
+ Field::new("c3", DataType::Int8, false),
+ Field::new("c4", DataType::Int16, false),
+ Field::new("c5", DataType::Int32, false),
+ Field::new("c6", DataType::Int64, false),
+ Field::new("c7", DataType::UInt8, false),
+ Field::new("c8", DataType::UInt16, false),
+ Field::new("c9", DataType::UInt32, false),
+ Field::new("c10", DataType::UInt64, false),
+ Field::new("c11", DataType::Float32, false),
+ Field::new("c12", DataType::Float64, false),
+ Field::new("c13", DataType::Utf8, false),
+ Field::new("missing_col", DataType::Int64, true),
+ ]);
+
+ Arc::new(schema)
+}
+
#[cfg(test)]
mod tests {
use super::*;