This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 7c08a6fd00 refactor: Move SchemaAdapter from parquet module to data 
source (#10680)
7c08a6fd00 is described below

commit 7c08a6fd00fcb352c11889ed62de7c9948978c79
Author: Michael Maletich <[email protected]>
AuthorDate: Mon May 27 04:48:05 2024 -0500

    refactor: Move SchemaAdapter from parquet module to data source (#10680)
    
    * refactor: Move SchemaAdapter from parquet module to data source
    
    This is not a change in behavior except moving the public location of 
SchemaAdapter.  SchemaAdapter was exposed
    in #10515 to allow callers to define their own implementation.  This PR 
then changes the location so that it could be used in other
    data sources.
    
    * fix comments surrounding tests to be accurate.
---
 .../core/src/datasource/file_format/parquet.rs     |   6 +-
 datafusion/core/src/datasource/mod.rs              |   1 +
 .../core/src/datasource/physical_plan/mod.rs       | 136 +--------
 .../src/datasource/physical_plan/parquet/mod.rs    |   9 +-
 .../physical_plan/parquet/schema_adapter.rs        |  69 -----
 datafusion/core/src/datasource/schema_adapter.rs   | 337 +++++++++++++++++++++
 datafusion/core/tests/parquet/mod.rs               |   1 -
 datafusion/core/tests/parquet/schema_adapter.rs    | 163 ----------
 8 files changed, 353 insertions(+), 369 deletions(-)

diff --git a/datafusion/core/src/datasource/file_format/parquet.rs 
b/datafusion/core/src/datasource/file_format/parquet.rs
index 7fcd41049c..e102cfc372 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -30,9 +30,9 @@ use crate::arrow::array::{
 };
 use crate::arrow::datatypes::{DataType, Fields, Schema, SchemaRef};
 use crate::datasource::file_format::file_compression_type::FileCompressionType;
-use crate::datasource::physical_plan::{
-    DefaultSchemaAdapterFactory, FileGroupDisplay, FileSinkConfig, ParquetExec,
-    SchemaAdapterFactory,
+use crate::datasource::physical_plan::{FileGroupDisplay, FileSinkConfig, 
ParquetExec};
+use crate::datasource::schema_adapter::{
+    DefaultSchemaAdapterFactory, SchemaAdapterFactory,
 };
 use crate::datasource::statistics::{create_max_min_accs, get_col_stats};
 use crate::error::Result;
diff --git a/datafusion/core/src/datasource/mod.rs 
b/datafusion/core/src/datasource/mod.rs
index 351967d353..c28788eed4 100644
--- a/datafusion/core/src/datasource/mod.rs
+++ b/datafusion/core/src/datasource/mod.rs
@@ -30,6 +30,7 @@ pub mod listing_table_factory;
 pub mod memory;
 pub mod physical_plan;
 pub mod provider;
+pub mod schema_adapter;
 mod statistics;
 pub mod stream;
 pub mod streaming;
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs 
b/datafusion/core/src/datasource/physical_plan/mod.rs
index 6e19961f60..720e29e355 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -31,19 +31,7 @@ mod statistics;
 pub(crate) use self::csv::plan_to_csv;
 pub(crate) use self::json::plan_to_json;
 #[cfg(feature = "parquet")]
-pub use self::parquet::{
-    ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory, SchemaAdapter,
-    SchemaAdapterFactory, SchemaMapper,
-};
-#[cfg(feature = "parquet")]
-use arrow::{
-    array::new_null_array,
-    compute::{can_cast_types, cast},
-    datatypes::Schema,
-    record_batch::{RecordBatch, RecordBatchOptions},
-};
-#[cfg(feature = "parquet")]
-use datafusion_common::plan_err;
+pub use self::parquet::{ParquetExec, ParquetFileMetrics, 
ParquetFileReaderFactory};
 
 pub use arrow_file::ArrowExec;
 pub use avro::AvroExec;
@@ -247,119 +235,6 @@ where
     Ok(())
 }
 
-#[cfg(feature = "parquet")]
-#[derive(Clone, Debug, Default)]
-pub(crate) struct DefaultSchemaAdapterFactory {}
-
-#[cfg(feature = "parquet")]
-impl SchemaAdapterFactory for DefaultSchemaAdapterFactory {
-    fn create(&self, table_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
-        Box::new(DefaultSchemaAdapter { table_schema })
-    }
-}
-
-#[cfg(feature = "parquet")]
-#[derive(Clone, Debug)]
-pub(crate) struct DefaultSchemaAdapter {
-    /// Schema for the table
-    table_schema: SchemaRef,
-}
-
-#[cfg(feature = "parquet")]
-impl SchemaAdapter for DefaultSchemaAdapter {
-    /// Map a column index in the table schema to a column index in a 
particular
-    /// file schema
-    ///
-    /// Panics if index is not in range for the table schema
-    fn map_column_index(&self, index: usize, file_schema: &Schema) -> 
Option<usize> {
-        let field = self.table_schema.field(index);
-        Some(file_schema.fields.find(field.name())?.0)
-    }
-
-    /// Creates a `SchemaMapping` that can be used to cast or map the columns 
from the file schema to the table schema.
-    ///
-    /// If the provided `file_schema` contains columns of a different type to 
the expected
-    /// `table_schema`, the method will attempt to cast the array data from 
the file schema
-    /// to the table schema where possible.
-    ///
-    /// Returns a [`SchemaMapping`] that can be applied to the output batch
-    /// along with an ordered list of columns to project from the file
-    fn map_schema(
-        &self,
-        file_schema: &Schema,
-    ) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
-        let mut projection = Vec::with_capacity(file_schema.fields().len());
-        let mut field_mappings = vec![None; self.table_schema.fields().len()];
-
-        for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
-            if let Some((table_idx, table_field)) =
-                self.table_schema.fields().find(file_field.name())
-            {
-                match can_cast_types(file_field.data_type(), 
table_field.data_type()) {
-                    true => {
-                        field_mappings[table_idx] = Some(projection.len());
-                        projection.push(file_idx);
-                    }
-                    false => {
-                        return plan_err!(
-                            "Cannot cast file schema field {} of type {:?} to 
table schema field of type {:?}",
-                            file_field.name(),
-                            file_field.data_type(),
-                            table_field.data_type()
-                        )
-                    }
-                }
-            }
-        }
-
-        Ok((
-            Arc::new(SchemaMapping {
-                table_schema: self.table_schema.clone(),
-                field_mappings,
-            }),
-            projection,
-        ))
-    }
-}
-
-/// The SchemaMapping struct holds a mapping from the file schema to the table 
schema
-/// and any necessary type conversions that need to be applied.
-#[cfg(feature = "parquet")]
-#[derive(Debug)]
-pub struct SchemaMapping {
-    /// The schema of the table. This is the expected schema after conversion 
and it should match the schema of the query result.
-    table_schema: SchemaRef,
-    /// Mapping from field index in `table_schema` to index in projected 
file_schema
-    field_mappings: Vec<Option<usize>>,
-}
-
-#[cfg(feature = "parquet")]
-impl SchemaMapper for SchemaMapping {
-    /// Adapts a `RecordBatch` to match the `table_schema` using the stored 
mapping and conversions.
-    fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
-        let batch_rows = batch.num_rows();
-        let batch_cols = batch.columns().to_vec();
-
-        let cols = self
-            .table_schema
-            .fields()
-            .iter()
-            .zip(&self.field_mappings)
-            .map(|(field, file_idx)| match file_idx {
-                Some(batch_idx) => cast(&batch_cols[*batch_idx], 
field.data_type()),
-                None => Ok(new_null_array(field.data_type(), batch_rows)),
-            })
-            .collect::<Result<Vec<_>, _>>()?;
-
-        // Necessary to handle empty batches
-        let options = 
RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
-
-        let schema = self.table_schema.clone();
-        let record_batch = RecordBatch::try_new_with_options(schema, cols, 
&options)?;
-        Ok(record_batch)
-    }
-}
-
 /// A single file or part of a file that should be read, along with its 
schema, statistics
 pub struct FileMeta {
     /// Path for the file (e.g. URL, filesystem path, etc)
@@ -621,11 +496,14 @@ mod tests {
     use arrow_array::cast::AsArray;
     use arrow_array::types::{Float32Type, Float64Type, UInt32Type};
     use arrow_array::{
-        BinaryArray, BooleanArray, Float32Array, Int32Array, Int64Array, 
StringArray,
-        UInt64Array,
+        BinaryArray, BooleanArray, Float32Array, Int32Array, Int64Array, 
RecordBatch,
+        StringArray, UInt64Array,
     };
-    use arrow_schema::Field;
+    use arrow_schema::{Field, Schema};
 
+    use crate::datasource::schema_adapter::{
+        DefaultSchemaAdapterFactory, SchemaAdapterFactory,
+    };
     use chrono::Utc;
 
     #[test]
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index 17cb6a66c7..9ee2b3a730 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -27,8 +27,8 @@ use crate::datasource::physical_plan::file_stream::{
     FileOpenFuture, FileOpener, FileStream,
 };
 use crate::datasource::physical_plan::{
-    parquet::page_filter::PagePruningPredicate, DefaultSchemaAdapterFactory, 
DisplayAs,
-    FileGroupPartitioner, FileMeta, FileScanConfig,
+    parquet::page_filter::PagePruningPredicate, DisplayAs, 
FileGroupPartitioner,
+    FileMeta, FileScanConfig,
 };
 use crate::{
     config::{ConfigOptions, TableParquetOptions},
@@ -67,12 +67,13 @@ mod metrics;
 mod page_filter;
 mod row_filter;
 mod row_groups;
-mod schema_adapter;
 mod statistics;
 
 use crate::datasource::physical_plan::parquet::row_groups::RowGroupSet;
+use crate::datasource::schema_adapter::{
+    DefaultSchemaAdapterFactory, SchemaAdapterFactory,
+};
 pub use metrics::ParquetFileMetrics;
-pub use schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper};
 pub use statistics::{RequestedStatistics, StatisticsConverter};
 
 /// Execution plan for scanning one or more Parquet partitions
diff --git 
a/datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs
deleted file mode 100644
index 193e5161a3..0000000000
--- a/datafusion/core/src/datasource/physical_plan/parquet/schema_adapter.rs
+++ /dev/null
@@ -1,69 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use arrow_array::RecordBatch;
-use arrow_schema::{Schema, SchemaRef};
-use std::fmt::Debug;
-use std::sync::Arc;
-
-/// Factory of schema adapters.
-///
-/// Provides means to implement custom schema adaptation.
-pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static {
-    /// Provides `SchemaAdapter` for the ParquetExec.
-    fn create(&self, schema: SchemaRef) -> Box<dyn SchemaAdapter>;
-}
-
-/// A utility which can adapt file-level record batches to a table schema 
which may have a schema
-/// obtained from merging multiple file-level schemas.
-///
-/// This is useful for enabling schema evolution in partitioned datasets.
-///
-/// This has to be done in two stages.
-///
-/// 1. Before reading the file, we have to map projected column indexes from 
the table schema to
-///    the file schema.
-///
-/// 2. After reading a record batch we need to map the read columns back to 
the expected columns
-///    indexes and insert null-valued columns wherever the file schema was 
missing a colum present
-///    in the table schema.
-pub trait SchemaAdapter: Send + Sync {
-    /// Map a column index in the table schema to a column index in a 
particular
-    /// file schema
-    ///
-    /// Panics if index is not in range for the table schema
-    fn map_column_index(&self, index: usize, file_schema: &Schema) -> 
Option<usize>;
-
-    /// Creates a `SchemaMapping` that can be used to cast or map the columns 
from the file schema to the table schema.
-    ///
-    /// If the provided `file_schema` contains columns of a different type to 
the expected
-    /// `table_schema`, the method will attempt to cast the array data from 
the file schema
-    /// to the table schema where possible.
-    ///
-    /// Returns a [`SchemaMapper`] that can be applied to the output batch
-    /// along with an ordered list of columns to project from the file
-    fn map_schema(
-        &self,
-        file_schema: &Schema,
-    ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)>;
-}
-
-/// Transforms a RecordBatch from Parquet to a RecordBatch that meets the 
table schema.
-pub trait SchemaMapper: Send + Sync {
-    /// Adapts a `RecordBatch` to match the `table_schema` using the stored 
mapping and conversions.
-    fn map_batch(&self, batch: RecordBatch) -> 
datafusion_common::Result<RecordBatch>;
-}
diff --git a/datafusion/core/src/datasource/schema_adapter.rs 
b/datafusion/core/src/datasource/schema_adapter.rs
new file mode 100644
index 0000000000..36d33379b8
--- /dev/null
+++ b/datafusion/core/src/datasource/schema_adapter.rs
@@ -0,0 +1,337 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Schema Adapter provides a method of translating the RecordBatches that 
come out of the
+//! physical format into how they should be used by DataFusion.  For instance, 
a schema
+//! can be stored external to a parquet file that maps parquet logical types 
to arrow types.
+
+use arrow::compute::{can_cast_types, cast};
+use arrow_array::{new_null_array, RecordBatch, RecordBatchOptions};
+use arrow_schema::{Schema, SchemaRef};
+use datafusion_common::plan_err;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+/// Factory of schema adapters.
+///
+/// Provides means to implement custom schema adaptation.
+pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static {
+    /// Provides `SchemaAdapter`.
+    fn create(&self, schema: SchemaRef) -> Box<dyn SchemaAdapter>;
+}
+
+/// A utility which can adapt file-level record batches to a table schema 
which may have a schema
+/// obtained from merging multiple file-level schemas.
+///
+/// This is useful for enabling schema evolution in partitioned datasets.
+///
+/// This has to be done in two stages.
+///
+/// 1. Before reading the file, we have to map projected column indexes from 
the table schema to
+///    the file schema.
+///
+/// 2. After reading a record batch we need to map the read columns back to 
the expected columns
+///    indexes and insert null-valued columns wherever the file schema was 
missing a colum present
+///    in the table schema.
+pub trait SchemaAdapter: Send + Sync {
+    /// Map a column index in the table schema to a column index in a 
particular
+    /// file schema
+    ///
+    /// Panics if index is not in range for the table schema
+    fn map_column_index(&self, index: usize, file_schema: &Schema) -> 
Option<usize>;
+
+    /// Creates a `SchemaMapping` that can be used to cast or map the columns 
from the file schema to the table schema.
+    ///
+    /// If the provided `file_schema` contains columns of a different type to 
the expected
+    /// `table_schema`, the method will attempt to cast the array data from 
the file schema
+    /// to the table schema where possible.
+    ///
+    /// Returns a [`SchemaMapper`] that can be applied to the output batch
+    /// along with an ordered list of columns to project from the file
+    fn map_schema(
+        &self,
+        file_schema: &Schema,
+    ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)>;
+}
+
+/// Transforms a RecordBatch from the physical layer to a RecordBatch that 
meets the table schema.
+pub trait SchemaMapper: Send + Sync {
+    /// Adapts a `RecordBatch` to match the `table_schema` using the stored 
mapping and conversions.
+    fn map_batch(&self, batch: RecordBatch) -> 
datafusion_common::Result<RecordBatch>;
+}
+
+#[derive(Clone, Debug, Default)]
+pub(crate) struct DefaultSchemaAdapterFactory {}
+
+impl SchemaAdapterFactory for DefaultSchemaAdapterFactory {
+    fn create(&self, table_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
+        Box::new(DefaultSchemaAdapter { table_schema })
+    }
+}
+
+#[derive(Clone, Debug)]
+pub(crate) struct DefaultSchemaAdapter {
+    /// Schema for the table
+    table_schema: SchemaRef,
+}
+
+impl SchemaAdapter for DefaultSchemaAdapter {
+    /// Map a column index in the table schema to a column index in a 
particular
+    /// file schema
+    ///
+    /// Panics if index is not in range for the table schema
+    fn map_column_index(&self, index: usize, file_schema: &Schema) -> 
Option<usize> {
+        let field = self.table_schema.field(index);
+        Some(file_schema.fields.find(field.name())?.0)
+    }
+
+    /// Creates a `SchemaMapping` that can be used to cast or map the columns 
from the file schema to the table schema.
+    ///
+    /// If the provided `file_schema` contains columns of a different type to 
the expected
+    /// `table_schema`, the method will attempt to cast the array data from 
the file schema
+    /// to the table schema where possible.
+    ///
+    /// Returns a [`SchemaMapping`] that can be applied to the output batch
+    /// along with an ordered list of columns to project from the file
+    fn map_schema(
+        &self,
+        file_schema: &Schema,
+    ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
+        let mut projection = Vec::with_capacity(file_schema.fields().len());
+        let mut field_mappings = vec![None; self.table_schema.fields().len()];
+
+        for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
+            if let Some((table_idx, table_field)) =
+                self.table_schema.fields().find(file_field.name())
+            {
+                match can_cast_types(file_field.data_type(), 
table_field.data_type()) {
+                    true => {
+                        field_mappings[table_idx] = Some(projection.len());
+                        projection.push(file_idx);
+                    }
+                    false => {
+                        return plan_err!(
+                            "Cannot cast file schema field {} of type {:?} to 
table schema field of type {:?}",
+                            file_field.name(),
+                            file_field.data_type(),
+                            table_field.data_type()
+                        )
+                    }
+                }
+            }
+        }
+
+        Ok((
+            Arc::new(SchemaMapping {
+                table_schema: self.table_schema.clone(),
+                field_mappings,
+            }),
+            projection,
+        ))
+    }
+}
+
+/// The SchemaMapping struct holds a mapping from the file schema to the table 
schema
+/// and any necessary type conversions that need to be applied.
+#[derive(Debug)]
+pub struct SchemaMapping {
+    /// The schema of the table. This is the expected schema after conversion 
and it should match the schema of the query result.
+    table_schema: SchemaRef,
+    /// Mapping from field index in `table_schema` to index in projected 
file_schema
+    field_mappings: Vec<Option<usize>>,
+}
+
+impl SchemaMapper for SchemaMapping {
+    /// Adapts a `RecordBatch` to match the `table_schema` using the stored 
mapping and conversions.
+    fn map_batch(&self, batch: RecordBatch) -> 
datafusion_common::Result<RecordBatch> {
+        let batch_rows = batch.num_rows();
+        let batch_cols = batch.columns().to_vec();
+
+        let cols = self
+            .table_schema
+            .fields()
+            .iter()
+            .zip(&self.field_mappings)
+            .map(|(field, file_idx)| match file_idx {
+                Some(batch_idx) => cast(&batch_cols[*batch_idx], 
field.data_type()),
+                None => Ok(new_null_array(field.data_type(), batch_rows)),
+            })
+            .collect::<datafusion_common::Result<Vec<_>, _>>()?;
+
+        // Necessary to handle empty batches
+        let options = 
RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
+
+        let schema = self.table_schema.clone();
+        let record_batch = RecordBatch::try_new_with_options(schema, cols, 
&options)?;
+        Ok(record_batch)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::fs;
+    use std::sync::Arc;
+
+    use crate::assert_batches_sorted_eq;
+    use arrow::datatypes::{Field, Schema};
+    use arrow::record_batch::RecordBatch;
+    use arrow_array::{Int32Array, StringArray};
+    use arrow_schema::{DataType, SchemaRef};
+    use object_store::path::Path;
+    use object_store::ObjectMeta;
+
+    use crate::datasource::object_store::ObjectStoreUrl;
+    use crate::datasource::physical_plan::{FileScanConfig, ParquetExec};
+    use crate::physical_plan::collect;
+    use crate::prelude::SessionContext;
+
+    use crate::datasource::listing::PartitionedFile;
+    use crate::datasource::schema_adapter::{
+        SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
+    };
+    use parquet::arrow::ArrowWriter;
+    use tempfile::TempDir;
+
+    #[tokio::test]
+    async fn can_override_schema_adapter() {
+        // Test shows that SchemaAdapter can add a column that doesn't existin 
in the
+        // record batches returned from parquet.  This can be useful for 
schema evolution
+        // where older files may not have all columns.
+        let tmp_dir = TempDir::new().unwrap();
+        let table_dir = tmp_dir.path().join("parquet_test");
+        fs::DirBuilder::new().create(table_dir.as_path()).unwrap();
+        let f1 = Field::new("id", DataType::Int32, true);
+
+        let file_schema = Arc::new(Schema::new(vec![f1.clone()]));
+        let filename = "part.parquet".to_string();
+        let path = table_dir.as_path().join(filename.clone());
+        let file = fs::File::create(path.clone()).unwrap();
+        let mut writer = ArrowWriter::try_new(file, file_schema.clone(), 
None).unwrap();
+
+        let ids = Arc::new(Int32Array::from(vec![1i32]));
+        let rec_batch = RecordBatch::try_new(file_schema.clone(), 
vec![ids]).unwrap();
+
+        writer.write(&rec_batch).unwrap();
+        writer.close().unwrap();
+
+        let location = Path::parse(path.to_str().unwrap()).unwrap();
+        let metadata = std::fs::metadata(path.as_path()).expect("Local file 
metadata");
+        let meta = ObjectMeta {
+            location,
+            last_modified: 
metadata.modified().map(chrono::DateTime::from).unwrap(),
+            size: metadata.len() as usize,
+            e_tag: None,
+            version: None,
+        };
+
+        let partitioned_file = PartitionedFile {
+            object_meta: meta,
+            partition_values: vec![],
+            range: None,
+            statistics: None,
+            extensions: None,
+        };
+
+        let f1 = Field::new("id", DataType::Int32, true);
+        let f2 = Field::new("extra_column", DataType::Utf8, true);
+
+        let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()]));
+
+        // prepare the scan
+        let parquet_exec = ParquetExec::new(
+            FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema)
+                .with_file(partitioned_file),
+            None,
+            None,
+            Default::default(),
+        )
+        .with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {}));
+
+        let session_ctx = SessionContext::new();
+        let task_ctx = session_ctx.task_ctx();
+        let read = collect(Arc::new(parquet_exec), task_ctx).await.unwrap();
+
+        let expected = [
+            "+----+--------------+",
+            "| id | extra_column |",
+            "+----+--------------+",
+            "| 1  | foo          |",
+            "+----+--------------+",
+        ];
+
+        assert_batches_sorted_eq!(expected, &read);
+    }
+
+    #[derive(Debug)]
+    struct TestSchemaAdapterFactory {}
+
+    impl SchemaAdapterFactory for TestSchemaAdapterFactory {
+        fn create(&self, schema: SchemaRef) -> Box<dyn SchemaAdapter> {
+            Box::new(TestSchemaAdapter {
+                table_schema: schema,
+            })
+        }
+    }
+
+    struct TestSchemaAdapter {
+        /// Schema for the table
+        table_schema: SchemaRef,
+    }
+
+    impl SchemaAdapter for TestSchemaAdapter {
+        fn map_column_index(&self, index: usize, file_schema: &Schema) -> 
Option<usize> {
+            let field = self.table_schema.field(index);
+            Some(file_schema.fields.find(field.name())?.0)
+        }
+
+        fn map_schema(
+            &self,
+            file_schema: &Schema,
+        ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
+            let mut projection = 
Vec::with_capacity(file_schema.fields().len());
+
+            for (file_idx, file_field) in 
file_schema.fields.iter().enumerate() {
+                if 
self.table_schema.fields().find(file_field.name()).is_some() {
+                    projection.push(file_idx);
+                }
+            }
+
+            Ok((Arc::new(TestSchemaMapping {}), projection))
+        }
+    }
+
+    #[derive(Debug)]
+    struct TestSchemaMapping {}
+
+    impl SchemaMapper for TestSchemaMapping {
+        fn map_batch(
+            &self,
+            batch: RecordBatch,
+        ) -> datafusion_common::Result<RecordBatch> {
+            let f1 = Field::new("id", DataType::Int32, true);
+            let f2 = Field::new("extra_column", DataType::Utf8, true);
+
+            let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()]));
+
+            let extra_column = Arc::new(StringArray::from(vec!["foo"]));
+            let mut new_columns = batch.columns().to_vec();
+            new_columns.push(extra_column);
+
+            Ok(RecordBatch::try_new(schema, new_columns).unwrap())
+        }
+    }
+}
diff --git a/datafusion/core/tests/parquet/mod.rs 
b/datafusion/core/tests/parquet/mod.rs
index a0b62d7001..94ae9ff601 100644
--- a/datafusion/core/tests/parquet/mod.rs
+++ b/datafusion/core/tests/parquet/mod.rs
@@ -49,7 +49,6 @@ mod filter_pushdown;
 mod page_pruning;
 mod row_group_pruning;
 mod schema;
-mod schema_adapter;
 mod schema_coercion;
 
 #[cfg(test)]
diff --git a/datafusion/core/tests/parquet/schema_adapter.rs 
b/datafusion/core/tests/parquet/schema_adapter.rs
deleted file mode 100644
index ead2884e43..0000000000
--- a/datafusion/core/tests/parquet/schema_adapter.rs
+++ /dev/null
@@ -1,163 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::fs;
-use std::sync::Arc;
-
-use arrow::datatypes::{Field, Schema};
-use arrow::record_batch::RecordBatch;
-use arrow_array::{Int32Array, StringArray};
-use arrow_schema::{DataType, SchemaRef};
-use datafusion::assert_batches_sorted_eq;
-use object_store::path::Path;
-use object_store::ObjectMeta;
-
-use datafusion::datasource::object_store::ObjectStoreUrl;
-use datafusion::datasource::physical_plan::{
-    FileScanConfig, ParquetExec, SchemaAdapter, SchemaAdapterFactory, 
SchemaMapper,
-};
-use datafusion::physical_plan::collect;
-use datafusion::prelude::SessionContext;
-
-use datafusion::datasource::listing::PartitionedFile;
-use parquet::arrow::ArrowWriter;
-use tempfile::TempDir;
-
-#[tokio::test]
-async fn can_override_schema_adapter() {
-    // Create several parquet files in same directoty / table with
-    // same schema but different metadata
-    let tmp_dir = TempDir::new().unwrap();
-    let table_dir = tmp_dir.path().join("parquet_test");
-    fs::DirBuilder::new().create(table_dir.as_path()).unwrap();
-    let f1 = Field::new("id", DataType::Int32, true);
-
-    let file_schema = Arc::new(Schema::new(vec![f1.clone()]));
-    let filename = "part.parquet".to_string();
-    let path = table_dir.as_path().join(filename.clone());
-    let file = fs::File::create(path.clone()).unwrap();
-    let mut writer = ArrowWriter::try_new(file, file_schema.clone(), 
None).unwrap();
-
-    let ids = Arc::new(Int32Array::from(vec![1i32]));
-    let rec_batch = RecordBatch::try_new(file_schema.clone(), 
vec![ids]).unwrap();
-
-    writer.write(&rec_batch).unwrap();
-    writer.close().unwrap();
-
-    let location = Path::parse(path.to_str().unwrap()).unwrap();
-    let metadata = std::fs::metadata(path.as_path()).expect("Local file 
metadata");
-    let meta = ObjectMeta {
-        location,
-        last_modified: 
metadata.modified().map(chrono::DateTime::from).unwrap(),
-        size: metadata.len() as usize,
-        e_tag: None,
-        version: None,
-    };
-
-    let partitioned_file = PartitionedFile {
-        object_meta: meta,
-        partition_values: vec![],
-        range: None,
-        statistics: None,
-        extensions: None,
-    };
-
-    let f1 = Field::new("id", DataType::Int32, true);
-    let f2 = Field::new("extra_column", DataType::Utf8, true);
-
-    let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()]));
-
-    // prepare the scan
-    let parquet_exec = ParquetExec::new(
-        FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema)
-            .with_file(partitioned_file),
-        None,
-        None,
-        Default::default(),
-    )
-    .with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {}));
-
-    let session_ctx = SessionContext::new();
-    let task_ctx = session_ctx.task_ctx();
-    let read = collect(Arc::new(parquet_exec), task_ctx).await.unwrap();
-
-    let expected = [
-        "+----+--------------+",
-        "| id | extra_column |",
-        "+----+--------------+",
-        "| 1  | foo          |",
-        "+----+--------------+",
-    ];
-
-    assert_batches_sorted_eq!(expected, &read);
-}
-
-#[derive(Debug)]
-struct TestSchemaAdapterFactory {}
-
-impl SchemaAdapterFactory for TestSchemaAdapterFactory {
-    fn create(&self, schema: SchemaRef) -> Box<dyn SchemaAdapter> {
-        Box::new(TestSchemaAdapter {
-            table_schema: schema,
-        })
-    }
-}
-
-struct TestSchemaAdapter {
-    /// Schema for the table
-    table_schema: SchemaRef,
-}
-
-impl SchemaAdapter for TestSchemaAdapter {
-    fn map_column_index(&self, index: usize, file_schema: &Schema) -> 
Option<usize> {
-        let field = self.table_schema.field(index);
-        Some(file_schema.fields.find(field.name())?.0)
-    }
-
-    fn map_schema(
-        &self,
-        file_schema: &Schema,
-    ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
-        let mut projection = Vec::with_capacity(file_schema.fields().len());
-
-        for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
-            if self.table_schema.fields().find(file_field.name()).is_some() {
-                projection.push(file_idx);
-            }
-        }
-
-        Ok((Arc::new(TestSchemaMapping {}), projection))
-    }
-}
-
-#[derive(Debug)]
-struct TestSchemaMapping {}
-
-impl SchemaMapper for TestSchemaMapping {
-    fn map_batch(&self, batch: RecordBatch) -> 
datafusion_common::Result<RecordBatch> {
-        let f1 = Field::new("id", DataType::Int32, true);
-        let f2 = Field::new("extra_column", DataType::Utf8, true);
-
-        let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()]));
-
-        let extra_column = Arc::new(StringArray::from(vec!["foo"]));
-        let mut new_columns = batch.columns().to_vec();
-        new_columns.push(extra_column);
-
-        Ok(RecordBatch::try_new(schema, new_columns).unwrap())
-    }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to