This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 7c08a6fd00 refactor: Move SchemaAdapter from parquet module to data
source (#10680)
7c08a6fd00 is described below
commit 7c08a6fd00fcb352c11889ed62de7c9948978c79
Author: Michael Maletich <[email protected]>
AuthorDate: Mon May 27 04:48:05 2024 -0500
refactor: Move SchemaAdapter from parquet module to data source (#10680)
* refactor: Move SchemaAdapter from parquet module to data source
This is not a change in behavior except moving the public location of
SchemaAdapter. SchemaAdapter was exposed
in #10515 to allow callers to define their own implementation. This PR
then changes the location so that it could be used in other
data sources.
* fix comments surrounding tests to be accurate.
---
.../core/src/datasource/file_format/parquet.rs | 6 +-
datafusion/core/src/datasource/mod.rs | 1 +
.../core/src/datasource/physical_plan/mod.rs | 136 +--------
.../src/datasource/physical_plan/parquet/mod.rs | 9 +-
.../physical_plan/parquet/schema_adapter.rs | 69 -----
datafusion/core/src/datasource/schema_adapter.rs | 337 +++++++++++++++++++++
datafusion/core/tests/parquet/mod.rs | 1 -
datafusion/core/tests/parquet/schema_adapter.rs | 163 ----------
8 files changed, 353 insertions(+), 369 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs
b/datafusion/core/src/datasource/file_format/parquet.rs
index 7fcd41049c..e102cfc372 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -30,9 +30,9 @@ use crate::arrow::array::{
};
use crate::arrow::datatypes::{DataType, Fields, Schema, SchemaRef};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
-use crate::datasource::physical_plan::{
- DefaultSchemaAdapterFactory, FileGroupDisplay, FileSinkConfig, ParquetExec,
- SchemaAdapterFactory,
+use crate::datasource::physical_plan::{FileGroupDisplay, FileSinkConfig,
ParquetExec};
+use crate::datasource::schema_adapter::{
+ DefaultSchemaAdapterFactory, SchemaAdapterFactory,
};
use crate::datasource::statistics::{create_max_min_accs, get_col_stats};
use crate::error::Result;
diff --git a/datafusion/core/src/datasource/mod.rs
b/datafusion/core/src/datasource/mod.rs
index 351967d353..c28788eed4 100644
--- a/datafusion/core/src/datasource/mod.rs
+++ b/datafusion/core/src/datasource/mod.rs
@@ -30,6 +30,7 @@ pub mod listing_table_factory;
pub mod memory;
pub mod physical_plan;
pub mod provider;
+pub mod schema_adapter;
mod statistics;
pub mod stream;
pub mod streaming;
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs
b/datafusion/core/src/datasource/physical_plan/mod.rs
index 6e19961f60..720e29e355 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -31,19 +31,7 @@ mod statistics;
pub(crate) use self::csv::plan_to_csv;
pub(crate) use self::json::plan_to_json;
#[cfg(feature = "parquet")]
-pub use self::parquet::{
- ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory, SchemaAdapter,
- SchemaAdapterFactory, SchemaMapper,
-};
-#[cfg(feature = "parquet")]
-use arrow::{
- array::new_null_array,
- compute::{can_cast_types, cast},
- datatypes::Schema,
- record_batch::{RecordBatch, RecordBatchOptions},
-};
-#[cfg(feature = "parquet")]
-use datafusion_common::plan_err;
+pub use self::parquet::{ParquetExec, ParquetFileMetrics,
ParquetFileReaderFactory};
pub use arrow_file::ArrowExec;
pub use avro::AvroExec;
@@ -247,119 +235,6 @@ where
Ok(())
}
-#[cfg(feature = "parquet")]
-#[derive(Clone, Debug, Default)]
-pub(crate) struct DefaultSchemaAdapterFactory {}
-
-#[cfg(feature = "parquet")]
-impl SchemaAdapterFactory for DefaultSchemaAdapterFactory {
- fn create(&self, table_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
- Box::new(DefaultSchemaAdapter { table_schema })
- }
-}
-
-#[cfg(feature = "parquet")]
-#[derive(Clone, Debug)]
-pub(crate) struct DefaultSchemaAdapter {
- /// Schema for the table
- table_schema: SchemaRef,
-}
-
-#[cfg(feature = "parquet")]
-impl SchemaAdapter for DefaultSchemaAdapter {
- /// Map a column index in the table schema to a column index in a
particular
- /// file schema
- ///
- /// Panics if index is not in range for the table schema
- fn map_column_index(&self, index: usize, file_schema: &Schema) ->
Option<usize> {
- let field = self.table_schema.field(index);
- Some(file_schema.fields.find(field.name())?.0)
- }
-
- /// Creates a `SchemaMapping` that can be used to cast or map the columns
from the file schema to the table schema.
- ///
- /// If the provided `file_schema` contains columns of a different type to
the expected
- /// `table_schema`, the method will attempt to cast the array data from
the file schema
- /// to the table schema where possible.
- ///
- /// Returns a [`SchemaMapping`] that can be applied to the output batch
- /// along with an ordered list of columns to project from the file
- fn map_schema(
- &self,
- file_schema: &Schema,
- ) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
- let mut projection = Vec::with_capacity(file_schema.fields().len());
- let mut field_mappings = vec![None; self.table_schema.fields().len()];
-
- for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
- if let Some((table_idx, table_field)) =
- self.table_schema.fields().find(file_field.name())
- {
- match can_cast_types(file_field.data_type(),
table_field.data_type()) {
- true => {
- field_mappings[table_idx] = Some(projection.len());
- projection.push(file_idx);
- }
- false => {
- return plan_err!(
- "Cannot cast file schema field {} of type {:?} to
table schema field of type {:?}",
- file_field.name(),
- file_field.data_type(),
- table_field.data_type()
- )
- }
- }
- }
- }
-
- Ok((
- Arc::new(SchemaMapping {
- table_schema: self.table_schema.clone(),
- field_mappings,
- }),
- projection,
- ))
- }
-}
-
-/// The SchemaMapping struct holds a mapping from the file schema to the table
schema
-/// and any necessary type conversions that need to be applied.
-#[cfg(feature = "parquet")]
-#[derive(Debug)]
-pub struct SchemaMapping {
- /// The schema of the table. This is the expected schema after conversion
and it should match the schema of the query result.
- table_schema: SchemaRef,
- /// Mapping from field index in `table_schema` to index in projected
file_schema
- field_mappings: Vec<Option<usize>>,
-}
-
-#[cfg(feature = "parquet")]
-impl SchemaMapper for SchemaMapping {
- /// Adapts a `RecordBatch` to match the `table_schema` using the stored
mapping and conversions.
- fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
- let batch_rows = batch.num_rows();
- let batch_cols = batch.columns().to_vec();
-
- let cols = self
- .table_schema
- .fields()
- .iter()
- .zip(&self.field_mappings)
- .map(|(field, file_idx)| match file_idx {
- Some(batch_idx) => cast(&batch_cols[*batch_idx],
field.data_type()),
- None => Ok(new_null_array(field.data_type(), batch_rows)),
- })
- .collect::<Result<Vec<_>, _>>()?;
-
- // Necessary to handle empty batches
- let options =
RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
-
- let schema = self.table_schema.clone();
- let record_batch = RecordBatch::try_new_with_options(schema, cols,
&options)?;
- Ok(record_batch)
- }
-}
-
/// A single file or part of a file that should be read, along with its
schema, statistics
pub struct FileMeta {
/// Path for the file (e.g. URL, filesystem path, etc)
@@ -621,11 +496,14 @@ mod tests {
use arrow_array::cast::AsArray;
use arrow_array::types::{Float32Type, Float64Type, UInt32Type};
use arrow_array::{
- BinaryArray, BooleanArray, Float32Array, Int32Array, Int64Array,
StringArray,
- UInt64Array,
+ BinaryArray, BooleanArray, Float32Array, Int32Array, Int64Array,
RecordBatch,
+ StringArray, UInt64Array,
};
- use arrow_schema::Field;
+ use arrow_schema::{Field, Schema};
+ use crate::datasource::schema_adapter::{
+ DefaultSchemaAdapterFactory, SchemaAdapterFactory,
+ };
use chrono::Utc;
#[test]
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index 17cb6a66c7..9ee2b3a730 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -27,8 +27,8 @@ use crate::datasource::physical_plan::file_stream::{
FileOpenFuture, FileOpener, FileStream,
};
use crate::datasource::physical_plan::{
- parquet::page_filter::PagePruningPredicate, DefaultSchemaAdapterFactory,
DisplayAs,
- FileGroupPartitioner, FileMeta, FileScanConfig,
+ parquet::page_filter::PagePruningPredicate, DisplayAs,
FileGroupPartitioner,
+ FileMeta, FileScanConfig,
};
use crate::{
config::{ConfigOptions, TableParquetOptions},
@@ -67,12 +67,13 @@ mod metrics;
mod page_filter;
mod row_filter;
mod row_groups;
-mod schema_adapter;
mod statistics;
use crate::datasource::physical_plan::parquet::row_groups::RowGroupSet;
+use crate::datasource::schema_adapter::{
+ DefaultSchemaAdapterFactory, SchemaAdapterFactory,
+};
pub use metrics::ParquetFileMetrics;
-pub use schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper};
pub use statistics::{RequestedStatistics, StatisticsConverter};
/// Execution plan for scanning one or more Parquet partitions
diff --git
a/datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs
b/datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs
deleted file mode 100644
index 193e5161a3..0000000000
--- a/datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs
+++ /dev/null
@@ -1,69 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use arrow_array::RecordBatch;
-use arrow_schema::{Schema, SchemaRef};
-use std::fmt::Debug;
-use std::sync::Arc;
-
-/// Factory of schema adapters.
-///
-/// Provides means to implement custom schema adaptation.
-pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static {
- /// Provides `SchemaAdapter` for the ParquetExec.
- fn create(&self, schema: SchemaRef) -> Box<dyn SchemaAdapter>;
-}
-
-/// A utility which can adapt file-level record batches to a table schema
which may have a schema
-/// obtained from merging multiple file-level schemas.
-///
-/// This is useful for enabling schema evolution in partitioned datasets.
-///
-/// This has to be done in two stages.
-///
-/// 1. Before reading the file, we have to map projected column indexes from
the table schema to
-/// the file schema.
-///
-/// 2. After reading a record batch we need to map the read columns back to
the expected columns
-/// indexes and insert null-valued columns wherever the file schema was
missing a colum present
-/// in the table schema.
-pub trait SchemaAdapter: Send + Sync {
- /// Map a column index in the table schema to a column index in a
particular
- /// file schema
- ///
- /// Panics if index is not in range for the table schema
- fn map_column_index(&self, index: usize, file_schema: &Schema) ->
Option<usize>;
-
- /// Creates a `SchemaMapping` that can be used to cast or map the columns
from the file schema to the table schema.
- ///
- /// If the provided `file_schema` contains columns of a different type to
the expected
- /// `table_schema`, the method will attempt to cast the array data from
the file schema
- /// to the table schema where possible.
- ///
- /// Returns a [`SchemaMapper`] that can be applied to the output batch
- /// along with an ordered list of columns to project from the file
- fn map_schema(
- &self,
- file_schema: &Schema,
- ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)>;
-}
-
-/// Transforms a RecordBatch from Parquet to a RecordBatch that meets the
table schema.
-pub trait SchemaMapper: Send + Sync {
- /// Adapts a `RecordBatch` to match the `table_schema` using the stored
mapping and conversions.
- fn map_batch(&self, batch: RecordBatch) ->
datafusion_common::Result<RecordBatch>;
-}
diff --git a/datafusion/core/src/datasource/schema_adapter.rs
b/datafusion/core/src/datasource/schema_adapter.rs
new file mode 100644
index 0000000000..36d33379b8
--- /dev/null
+++ b/datafusion/core/src/datasource/schema_adapter.rs
@@ -0,0 +1,337 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Schema Adapter provides a method of translating the RecordBatches that
come out of the
+//! physical format into how they should be used by DataFusion. For instance,
a schema
+//! can be stored external to a parquet file that maps parquet logical types
to arrow types.
+
+use arrow::compute::{can_cast_types, cast};
+use arrow_array::{new_null_array, RecordBatch, RecordBatchOptions};
+use arrow_schema::{Schema, SchemaRef};
+use datafusion_common::plan_err;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+/// Factory of schema adapters.
+///
+/// Provides means to implement custom schema adaptation.
+pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static {
+ /// Provides `SchemaAdapter`.
+ fn create(&self, schema: SchemaRef) -> Box<dyn SchemaAdapter>;
+}
+
+/// A utility which can adapt file-level record batches to a table schema
which may have a schema
+/// obtained from merging multiple file-level schemas.
+///
+/// This is useful for enabling schema evolution in partitioned datasets.
+///
+/// This has to be done in two stages.
+///
+/// 1. Before reading the file, we have to map projected column indexes from
the table schema to
+/// the file schema.
+///
+/// 2. After reading a record batch we need to map the read columns back to
the expected columns
+/// indexes and insert null-valued columns wherever the file schema was
missing a colum present
+/// in the table schema.
+pub trait SchemaAdapter: Send + Sync {
+ /// Map a column index in the table schema to a column index in a
particular
+ /// file schema
+ ///
+ /// Panics if index is not in range for the table schema
+ fn map_column_index(&self, index: usize, file_schema: &Schema) ->
Option<usize>;
+
+ /// Creates a `SchemaMapping` that can be used to cast or map the columns
from the file schema to the table schema.
+ ///
+ /// If the provided `file_schema` contains columns of a different type to
the expected
+ /// `table_schema`, the method will attempt to cast the array data from
the file schema
+ /// to the table schema where possible.
+ ///
+ /// Returns a [`SchemaMapper`] that can be applied to the output batch
+ /// along with an ordered list of columns to project from the file
+ fn map_schema(
+ &self,
+ file_schema: &Schema,
+ ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)>;
+}
+
+/// Transforms a RecordBatch from the physical layer to a RecordBatch that
meets the table schema.
+pub trait SchemaMapper: Send + Sync {
+ /// Adapts a `RecordBatch` to match the `table_schema` using the stored
mapping and conversions.
+ fn map_batch(&self, batch: RecordBatch) ->
datafusion_common::Result<RecordBatch>;
+}
+
+#[derive(Clone, Debug, Default)]
+pub(crate) struct DefaultSchemaAdapterFactory {}
+
+impl SchemaAdapterFactory for DefaultSchemaAdapterFactory {
+ fn create(&self, table_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
+ Box::new(DefaultSchemaAdapter { table_schema })
+ }
+}
+
+#[derive(Clone, Debug)]
+pub(crate) struct DefaultSchemaAdapter {
+ /// Schema for the table
+ table_schema: SchemaRef,
+}
+
+impl SchemaAdapter for DefaultSchemaAdapter {
+ /// Map a column index in the table schema to a column index in a
particular
+ /// file schema
+ ///
+ /// Panics if index is not in range for the table schema
+ fn map_column_index(&self, index: usize, file_schema: &Schema) ->
Option<usize> {
+ let field = self.table_schema.field(index);
+ Some(file_schema.fields.find(field.name())?.0)
+ }
+
+ /// Creates a `SchemaMapping` that can be used to cast or map the columns
from the file schema to the table schema.
+ ///
+ /// If the provided `file_schema` contains columns of a different type to
the expected
+ /// `table_schema`, the method will attempt to cast the array data from
the file schema
+ /// to the table schema where possible.
+ ///
+ /// Returns a [`SchemaMapping`] that can be applied to the output batch
+ /// along with an ordered list of columns to project from the file
+ fn map_schema(
+ &self,
+ file_schema: &Schema,
+ ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
+ let mut projection = Vec::with_capacity(file_schema.fields().len());
+ let mut field_mappings = vec![None; self.table_schema.fields().len()];
+
+ for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
+ if let Some((table_idx, table_field)) =
+ self.table_schema.fields().find(file_field.name())
+ {
+ match can_cast_types(file_field.data_type(),
table_field.data_type()) {
+ true => {
+ field_mappings[table_idx] = Some(projection.len());
+ projection.push(file_idx);
+ }
+ false => {
+ return plan_err!(
+ "Cannot cast file schema field {} of type {:?} to
table schema field of type {:?}",
+ file_field.name(),
+ file_field.data_type(),
+ table_field.data_type()
+ )
+ }
+ }
+ }
+ }
+
+ Ok((
+ Arc::new(SchemaMapping {
+ table_schema: self.table_schema.clone(),
+ field_mappings,
+ }),
+ projection,
+ ))
+ }
+}
+
+/// The SchemaMapping struct holds a mapping from the file schema to the table
schema
+/// and any necessary type conversions that need to be applied.
+#[derive(Debug)]
+pub struct SchemaMapping {
+ /// The schema of the table. This is the expected schema after conversion
and it should match the schema of the query result.
+ table_schema: SchemaRef,
+ /// Mapping from field index in `table_schema` to index in projected
file_schema
+ field_mappings: Vec<Option<usize>>,
+}
+
+impl SchemaMapper for SchemaMapping {
+ /// Adapts a `RecordBatch` to match the `table_schema` using the stored
mapping and conversions.
+ fn map_batch(&self, batch: RecordBatch) ->
datafusion_common::Result<RecordBatch> {
+ let batch_rows = batch.num_rows();
+ let batch_cols = batch.columns().to_vec();
+
+ let cols = self
+ .table_schema
+ .fields()
+ .iter()
+ .zip(&self.field_mappings)
+ .map(|(field, file_idx)| match file_idx {
+ Some(batch_idx) => cast(&batch_cols[*batch_idx],
field.data_type()),
+ None => Ok(new_null_array(field.data_type(), batch_rows)),
+ })
+ .collect::<datafusion_common::Result<Vec<_>, _>>()?;
+
+ // Necessary to handle empty batches
+ let options =
RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
+
+ let schema = self.table_schema.clone();
+ let record_batch = RecordBatch::try_new_with_options(schema, cols,
&options)?;
+ Ok(record_batch)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::fs;
+ use std::sync::Arc;
+
+ use crate::assert_batches_sorted_eq;
+ use arrow::datatypes::{Field, Schema};
+ use arrow::record_batch::RecordBatch;
+ use arrow_array::{Int32Array, StringArray};
+ use arrow_schema::{DataType, SchemaRef};
+ use object_store::path::Path;
+ use object_store::ObjectMeta;
+
+ use crate::datasource::object_store::ObjectStoreUrl;
+ use crate::datasource::physical_plan::{FileScanConfig, ParquetExec};
+ use crate::physical_plan::collect;
+ use crate::prelude::SessionContext;
+
+ use crate::datasource::listing::PartitionedFile;
+ use crate::datasource::schema_adapter::{
+ SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
+ };
+ use parquet::arrow::ArrowWriter;
+ use tempfile::TempDir;
+
+ #[tokio::test]
+ async fn can_override_schema_adapter() {
+ // Test shows that SchemaAdapter can add a column that doesn't existin
in the
+ // record batches returned from parquet. This can be useful for
schema evolution
+ // where older files may not have all columns.
+ let tmp_dir = TempDir::new().unwrap();
+ let table_dir = tmp_dir.path().join("parquet_test");
+ fs::DirBuilder::new().create(table_dir.as_path()).unwrap();
+ let f1 = Field::new("id", DataType::Int32, true);
+
+ let file_schema = Arc::new(Schema::new(vec![f1.clone()]));
+ let filename = "part.parquet".to_string();
+ let path = table_dir.as_path().join(filename.clone());
+ let file = fs::File::create(path.clone()).unwrap();
+ let mut writer = ArrowWriter::try_new(file, file_schema.clone(),
None).unwrap();
+
+ let ids = Arc::new(Int32Array::from(vec![1i32]));
+ let rec_batch = RecordBatch::try_new(file_schema.clone(),
vec![ids]).unwrap();
+
+ writer.write(&rec_batch).unwrap();
+ writer.close().unwrap();
+
+ let location = Path::parse(path.to_str().unwrap()).unwrap();
+ let metadata = std::fs::metadata(path.as_path()).expect("Local file
metadata");
+ let meta = ObjectMeta {
+ location,
+ last_modified:
metadata.modified().map(chrono::DateTime::from).unwrap(),
+ size: metadata.len() as usize,
+ e_tag: None,
+ version: None,
+ };
+
+ let partitioned_file = PartitionedFile {
+ object_meta: meta,
+ partition_values: vec![],
+ range: None,
+ statistics: None,
+ extensions: None,
+ };
+
+ let f1 = Field::new("id", DataType::Int32, true);
+ let f2 = Field::new("extra_column", DataType::Utf8, true);
+
+ let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()]));
+
+ // prepare the scan
+ let parquet_exec = ParquetExec::new(
+ FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema)
+ .with_file(partitioned_file),
+ None,
+ None,
+ Default::default(),
+ )
+ .with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {}));
+
+ let session_ctx = SessionContext::new();
+ let task_ctx = session_ctx.task_ctx();
+ let read = collect(Arc::new(parquet_exec), task_ctx).await.unwrap();
+
+ let expected = [
+ "+----+--------------+",
+ "| id | extra_column |",
+ "+----+--------------+",
+ "| 1 | foo |",
+ "+----+--------------+",
+ ];
+
+ assert_batches_sorted_eq!(expected, &read);
+ }
+
+ #[derive(Debug)]
+ struct TestSchemaAdapterFactory {}
+
+ impl SchemaAdapterFactory for TestSchemaAdapterFactory {
+ fn create(&self, schema: SchemaRef) -> Box<dyn SchemaAdapter> {
+ Box::new(TestSchemaAdapter {
+ table_schema: schema,
+ })
+ }
+ }
+
+ struct TestSchemaAdapter {
+ /// Schema for the table
+ table_schema: SchemaRef,
+ }
+
+ impl SchemaAdapter for TestSchemaAdapter {
+ fn map_column_index(&self, index: usize, file_schema: &Schema) ->
Option<usize> {
+ let field = self.table_schema.field(index);
+ Some(file_schema.fields.find(field.name())?.0)
+ }
+
+ fn map_schema(
+ &self,
+ file_schema: &Schema,
+ ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
+ let mut projection =
Vec::with_capacity(file_schema.fields().len());
+
+ for (file_idx, file_field) in
file_schema.fields.iter().enumerate() {
+ if
self.table_schema.fields().find(file_field.name()).is_some() {
+ projection.push(file_idx);
+ }
+ }
+
+ Ok((Arc::new(TestSchemaMapping {}), projection))
+ }
+ }
+
+ #[derive(Debug)]
+ struct TestSchemaMapping {}
+
+ impl SchemaMapper for TestSchemaMapping {
+ fn map_batch(
+ &self,
+ batch: RecordBatch,
+ ) -> datafusion_common::Result<RecordBatch> {
+ let f1 = Field::new("id", DataType::Int32, true);
+ let f2 = Field::new("extra_column", DataType::Utf8, true);
+
+ let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()]));
+
+ let extra_column = Arc::new(StringArray::from(vec!["foo"]));
+ let mut new_columns = batch.columns().to_vec();
+ new_columns.push(extra_column);
+
+ Ok(RecordBatch::try_new(schema, new_columns).unwrap())
+ }
+ }
+}
diff --git a/datafusion/core/tests/parquet/mod.rs
b/datafusion/core/tests/parquet/mod.rs
index a0b62d7001..94ae9ff601 100644
--- a/datafusion/core/tests/parquet/mod.rs
+++ b/datafusion/core/tests/parquet/mod.rs
@@ -49,7 +49,6 @@ mod filter_pushdown;
mod page_pruning;
mod row_group_pruning;
mod schema;
-mod schema_adapter;
mod schema_coercion;
#[cfg(test)]
diff --git a/datafusion/core/tests/parquet/schema_adapter.rs
b/datafusion/core/tests/parquet/schema_adapter.rs
deleted file mode 100644
index ead2884e43..0000000000
--- a/datafusion/core/tests/parquet/schema_adapter.rs
+++ /dev/null
@@ -1,163 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::fs;
-use std::sync::Arc;
-
-use arrow::datatypes::{Field, Schema};
-use arrow::record_batch::RecordBatch;
-use arrow_array::{Int32Array, StringArray};
-use arrow_schema::{DataType, SchemaRef};
-use datafusion::assert_batches_sorted_eq;
-use object_store::path::Path;
-use object_store::ObjectMeta;
-
-use datafusion::datasource::object_store::ObjectStoreUrl;
-use datafusion::datasource::physical_plan::{
- FileScanConfig, ParquetExec, SchemaAdapter, SchemaAdapterFactory,
SchemaMapper,
-};
-use datafusion::physical_plan::collect;
-use datafusion::prelude::SessionContext;
-
-use datafusion::datasource::listing::PartitionedFile;
-use parquet::arrow::ArrowWriter;
-use tempfile::TempDir;
-
-#[tokio::test]
-async fn can_override_schema_adapter() {
- // Create several parquet files in same directoty / table with
- // same schema but different metadata
- let tmp_dir = TempDir::new().unwrap();
- let table_dir = tmp_dir.path().join("parquet_test");
- fs::DirBuilder::new().create(table_dir.as_path()).unwrap();
- let f1 = Field::new("id", DataType::Int32, true);
-
- let file_schema = Arc::new(Schema::new(vec![f1.clone()]));
- let filename = "part.parquet".to_string();
- let path = table_dir.as_path().join(filename.clone());
- let file = fs::File::create(path.clone()).unwrap();
- let mut writer = ArrowWriter::try_new(file, file_schema.clone(),
None).unwrap();
-
- let ids = Arc::new(Int32Array::from(vec![1i32]));
- let rec_batch = RecordBatch::try_new(file_schema.clone(),
vec![ids]).unwrap();
-
- writer.write(&rec_batch).unwrap();
- writer.close().unwrap();
-
- let location = Path::parse(path.to_str().unwrap()).unwrap();
- let metadata = std::fs::metadata(path.as_path()).expect("Local file
metadata");
- let meta = ObjectMeta {
- location,
- last_modified:
metadata.modified().map(chrono::DateTime::from).unwrap(),
- size: metadata.len() as usize,
- e_tag: None,
- version: None,
- };
-
- let partitioned_file = PartitionedFile {
- object_meta: meta,
- partition_values: vec![],
- range: None,
- statistics: None,
- extensions: None,
- };
-
- let f1 = Field::new("id", DataType::Int32, true);
- let f2 = Field::new("extra_column", DataType::Utf8, true);
-
- let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()]));
-
- // prepare the scan
- let parquet_exec = ParquetExec::new(
- FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema)
- .with_file(partitioned_file),
- None,
- None,
- Default::default(),
- )
- .with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {}));
-
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
- let read = collect(Arc::new(parquet_exec), task_ctx).await.unwrap();
-
- let expected = [
- "+----+--------------+",
- "| id | extra_column |",
- "+----+--------------+",
- "| 1 | foo |",
- "+----+--------------+",
- ];
-
- assert_batches_sorted_eq!(expected, &read);
-}
-
-#[derive(Debug)]
-struct TestSchemaAdapterFactory {}
-
-impl SchemaAdapterFactory for TestSchemaAdapterFactory {
- fn create(&self, schema: SchemaRef) -> Box<dyn SchemaAdapter> {
- Box::new(TestSchemaAdapter {
- table_schema: schema,
- })
- }
-}
-
-struct TestSchemaAdapter {
- /// Schema for the table
- table_schema: SchemaRef,
-}
-
-impl SchemaAdapter for TestSchemaAdapter {
- fn map_column_index(&self, index: usize, file_schema: &Schema) ->
Option<usize> {
- let field = self.table_schema.field(index);
- Some(file_schema.fields.find(field.name())?.0)
- }
-
- fn map_schema(
- &self,
- file_schema: &Schema,
- ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
- let mut projection = Vec::with_capacity(file_schema.fields().len());
-
- for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
- if self.table_schema.fields().find(file_field.name()).is_some() {
- projection.push(file_idx);
- }
- }
-
- Ok((Arc::new(TestSchemaMapping {}), projection))
- }
-}
-
-#[derive(Debug)]
-struct TestSchemaMapping {}
-
-impl SchemaMapper for TestSchemaMapping {
- fn map_batch(&self, batch: RecordBatch) ->
datafusion_common::Result<RecordBatch> {
- let f1 = Field::new("id", DataType::Int32, true);
- let f2 = Field::new("extra_column", DataType::Utf8, true);
-
- let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()]));
-
- let extra_column = Arc::new(StringArray::from(vec!["foo"]));
- let mut new_columns = batch.columns().to_vec();
- new_columns.push(extra_column);
-
- Ok(RecordBatch::try_new(schema, new_columns).unwrap())
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]