This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new f3477608b6 Minor: Extract `FileScanConfig` into its own module (#7335)
f3477608b6 is described below

commit f3477608b6978bd9409b165fa9e36cae5a97812a
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon Aug 21 14:21:05 2023 -0400

    Minor: Extract `FileScanConfig` into its own module (#7335)
    
    * Extract `FileScanConfig` into its own module
    
    * fix warnings
---
 .../datasource/physical_plan/file_scan_config.rs   | 796 +++++++++++++++++++++
 .../core/src/datasource/physical_plan/mod.rs       | 781 +-------------------
 2 files changed, 813 insertions(+), 764 deletions(-)

diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs 
b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
new file mode 100644
index 0000000000..ffb4f0902a
--- /dev/null
+++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
@@ -0,0 +1,796 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`FileScanConfig`] to configure scanning of possibly partitioned
+//! file sources.
+
+use crate::datasource::{
+    listing::{FileRange, PartitionedFile},
+    object_store::ObjectStoreUrl,
+};
+use crate::physical_plan::ExecutionPlan;
+use crate::{
+    error::{DataFusionError, Result},
+    scalar::ScalarValue,
+};
+
+use arrow::array::{ArrayData, BufferBuilder};
+use arrow::buffer::Buffer;
+use arrow::datatypes::{ArrowNativeType, UInt16Type};
+use arrow_array::{ArrayRef, DictionaryArray, RecordBatch};
+use arrow_schema::{DataType, Field, Schema, SchemaRef};
+use datafusion_common::tree_node::{TreeNode, VisitRecursion};
+use datafusion_common::{ColumnStatistics, Statistics};
+use datafusion_physical_expr::LexOrdering;
+
+use itertools::Itertools;
+use log::warn;
+use std::{
+    borrow::Cow, cmp::min, collections::HashMap, fmt::Debug, 
marker::PhantomData,
+    sync::Arc, vec,
+};
+
+use super::get_projected_output_ordering;
+
+/// Convert type to a type suitable for use as a [`ListingTable`]
+/// partition column. Returns `Dictionary(UInt16, val_type)`, which is
+/// a reasonable trade off between a reasonable number of partition
+/// values and space efficiency.
+///
+/// This use this to specify types for partition columns. However
+/// you MAY also choose not to dictionary-encode the data or to use a
+/// different dictionary type.
+///
+/// Use [`wrap_partition_value_in_dict`] to wrap a [`ScalarValue`] in the same 
say.
+///
+/// [`ListingTable`]: crate::datasource::listing::ListingTable
+pub fn wrap_partition_type_in_dict(val_type: DataType) -> DataType {
+    DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type))
+}
+
+/// Convert a [`ScalarValue`] of partition columns to a type, as
+/// decribed in the documentation of [`wrap_partition_type_in_dict`],
+/// which can wrap the types.
+pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {
+    ScalarValue::Dictionary(Box::new(DataType::UInt16), Box::new(val))
+}
+
+/// Get all of the [`PartitionedFile`] to be scanned for an [`ExecutionPlan`]
+pub fn get_scan_files(
+    plan: Arc<dyn ExecutionPlan>,
+) -> Result<Vec<Vec<Vec<PartitionedFile>>>> {
+    let mut collector: Vec<Vec<Vec<PartitionedFile>>> = vec![];
+    plan.apply(&mut |plan| {
+        if let Some(file_scan_config) = plan.file_scan_config() {
+            collector.push(file_scan_config.file_groups.clone());
+            Ok(VisitRecursion::Skip)
+        } else {
+            Ok(VisitRecursion::Continue)
+        }
+    })?;
+    Ok(collector)
+}
+
+/// The base configurations to provide when creating a physical plan for
+/// any given file format.
+#[derive(Clone)]
+pub struct FileScanConfig {
+    /// Object store URL, used to get an [`ObjectStore`] instance from
+    /// [`RuntimeEnv::object_store`]
+    ///
+    /// [`ObjectStore`]: object_store::ObjectStore
+    /// [`RuntimeEnv::object_store`]: 
datafusion_execution::runtime_env::RuntimeEnv::object_store
+    pub object_store_url: ObjectStoreUrl,
+    /// Schema before `projection` is applied. It contains the all columns 
that may
+    /// appear in the files. It does not include table partition columns
+    /// that may be added.
+    pub file_schema: SchemaRef,
+    /// List of files to be processed, grouped into partitions
+    ///
+    /// Each file must have a schema of `file_schema` or a subset. If
+    /// a particular file has a subset, the missing columns are
+    /// padded with NULLs.
+    ///
+    /// DataFusion may attempt to read each partition of files
+    /// concurrently, however files *within* a partition will be read
+    /// sequentially, one after the next.
+    pub file_groups: Vec<Vec<PartitionedFile>>,
+    /// Estimated overall statistics of the files, taking `filters` into 
account.
+    pub statistics: Statistics,
+    /// Columns on which to project the data. Indexes that are higher than the
+    /// number of columns of `file_schema` refer to `table_partition_cols`.
+    pub projection: Option<Vec<usize>>,
+    /// The maximum number of records to read from this plan. If `None`,
+    /// all records after filtering are returned.
+    pub limit: Option<usize>,
+    /// The partitioning columns
+    pub table_partition_cols: Vec<(String, DataType)>,
+    /// All equivalent lexicographical orderings that describe the schema.
+    pub output_ordering: Vec<LexOrdering>,
+    /// Indicates whether this plan may produce an infinite stream of records.
+    pub infinite_source: bool,
+}
+
+impl FileScanConfig {
+    /// Project the schema and the statistics on the given column indices
+    pub fn project(&self) -> (SchemaRef, Statistics, Vec<LexOrdering>) {
+        if self.projection.is_none() && self.table_partition_cols.is_empty() {
+            return (
+                Arc::clone(&self.file_schema),
+                self.statistics.clone(),
+                self.output_ordering.clone(),
+            );
+        }
+
+        let proj_iter: Box<dyn Iterator<Item = usize>> = match 
&self.projection {
+            Some(proj) => Box::new(proj.iter().copied()),
+            None => Box::new(
+                0..(self.file_schema.fields().len() + 
self.table_partition_cols.len()),
+            ),
+        };
+
+        let mut table_fields = vec![];
+        let mut table_cols_stats = vec![];
+        for idx in proj_iter {
+            if idx < self.file_schema.fields().len() {
+                table_fields.push(self.file_schema.field(idx).clone());
+                if let Some(file_cols_stats) = 
&self.statistics.column_statistics {
+                    table_cols_stats.push(file_cols_stats[idx].clone())
+                } else {
+                    table_cols_stats.push(ColumnStatistics::default())
+                }
+            } else {
+                let partition_idx = idx - self.file_schema.fields().len();
+                table_fields.push(Field::new(
+                    &self.table_partition_cols[partition_idx].0,
+                    self.table_partition_cols[partition_idx].1.to_owned(),
+                    false,
+                ));
+                // TODO provide accurate stat for partition column (#1186)
+                table_cols_stats.push(ColumnStatistics::default())
+            }
+        }
+
+        let table_stats = Statistics {
+            num_rows: self.statistics.num_rows,
+            is_exact: self.statistics.is_exact,
+            // TODO correct byte size?
+            total_byte_size: None,
+            column_statistics: Some(table_cols_stats),
+        };
+
+        let table_schema = Arc::new(
+            
Schema::new(table_fields).with_metadata(self.file_schema.metadata().clone()),
+        );
+        let projected_output_ordering =
+            get_projected_output_ordering(self, &table_schema);
+        (table_schema, table_stats, projected_output_ordering)
+    }
+
+    #[allow(unused)] // Only used by avro
+    pub(crate) fn projected_file_column_names(&self) -> Option<Vec<String>> {
+        self.projection.as_ref().map(|p| {
+            p.iter()
+                .filter(|col_idx| **col_idx < self.file_schema.fields().len())
+                .map(|col_idx| self.file_schema.field(*col_idx).name())
+                .cloned()
+                .collect()
+        })
+    }
+
+    pub(crate) fn file_column_projection_indices(&self) -> Option<Vec<usize>> {
+        self.projection.as_ref().map(|p| {
+            p.iter()
+                .filter(|col_idx| **col_idx < self.file_schema.fields().len())
+                .copied()
+                .collect()
+        })
+    }
+
+    /// Repartition all input files into `target_partitions` partitions, if 
total file size exceed
+    /// `repartition_file_min_size`
+    /// `target_partitions` and `repartition_file_min_size` directly come from 
configuration.
+    ///
+    /// This function only try to partition file byte range evenly, and let 
specific `FileOpener` to
+    /// do actual partition on specific data source type. (e.g. `CsvOpener` 
will only read lines
+    /// overlap with byte range but also handle boundaries to ensure all lines 
will be read exactly once)
+    pub fn repartition_file_groups(
+        file_groups: Vec<Vec<PartitionedFile>>,
+        target_partitions: usize,
+        repartition_file_min_size: usize,
+    ) -> Option<Vec<Vec<PartitionedFile>>> {
+        let flattened_files = file_groups.iter().flatten().collect::<Vec<_>>();
+
+        // Perform redistribution only in case all files should be read from 
beginning to end
+        let has_ranges = flattened_files.iter().any(|f| f.range.is_some());
+        if has_ranges {
+            return None;
+        }
+
+        let total_size = flattened_files
+            .iter()
+            .map(|f| f.object_meta.size as i64)
+            .sum::<i64>();
+        if total_size < (repartition_file_min_size as i64) || total_size == 0 {
+            return None;
+        }
+
+        let target_partition_size =
+            (total_size as usize + (target_partitions) - 1) / 
(target_partitions);
+
+        let current_partition_index: usize = 0;
+        let current_partition_size: usize = 0;
+
+        // Partition byte range evenly for all `PartitionedFile`s
+        let repartitioned_files = flattened_files
+            .into_iter()
+            .scan(
+                (current_partition_index, current_partition_size),
+                |state, source_file| {
+                    let mut produced_files = vec![];
+                    let mut range_start = 0;
+                    while range_start < source_file.object_meta.size {
+                        let range_end = min(
+                            range_start + (target_partition_size - state.1),
+                            source_file.object_meta.size,
+                        );
+
+                        let mut produced_file = source_file.clone();
+                        produced_file.range = Some(FileRange {
+                            start: range_start as i64,
+                            end: range_end as i64,
+                        });
+                        produced_files.push((state.0, produced_file));
+
+                        if state.1 + (range_end - range_start) >= 
target_partition_size {
+                            state.0 += 1;
+                            state.1 = 0;
+                        } else {
+                            state.1 += range_end - range_start;
+                        }
+                        range_start = range_end;
+                    }
+                    Some(produced_files)
+                },
+            )
+            .flatten()
+            .group_by(|(partition_idx, _)| *partition_idx)
+            .into_iter()
+            .map(|(_, group)| group.map(|(_, vals)| vals).collect_vec())
+            .collect_vec();
+
+        Some(repartitioned_files)
+    }
+}
+
+/// A helper that projects partition columns into the file record batches.
+///
+/// One interesting trick is the usage of a cache for the key buffers of the 
partition column
+/// dictionaries. Indeed, the partition columns are constant, so the 
dictionaries that represent them
+/// have all their keys equal to 0. This enables us to re-use the same 
"all-zero" buffer across batches,
+/// which makes the space consumption of the partition columns O(batch_size) 
instead of O(record_count).
+pub struct PartitionColumnProjector {
+    /// An Arrow buffer initialized to zeros that represents the key array of 
all partition
+    /// columns (partition columns are materialized by dictionary arrays with 
only one
+    /// value in the dictionary, thus all the keys are equal to zero).
+    key_buffer_cache: ZeroBufferGenerators,
+    /// Mapping between the indexes in the list of partition columns and the 
target
+    /// schema. Sorted by index in the target schema so that we can iterate on 
it to
+    /// insert the partition columns in the target record batch.
+    projected_partition_indexes: Vec<(usize, usize)>,
+    /// The schema of the table once the projection was applied.
+    projected_schema: SchemaRef,
+}
+
+impl PartitionColumnProjector {
+    // Create a projector to insert the partitioning columns into batches read 
from files
+    // - `projected_schema`: the target schema with both file and partitioning 
columns
+    // - `table_partition_cols`: all the partitioning column names
+    pub fn new(projected_schema: SchemaRef, table_partition_cols: &[String]) 
-> Self {
+        let mut idx_map = HashMap::new();
+        for (partition_idx, partition_name) in 
table_partition_cols.iter().enumerate() {
+            if let Ok(schema_idx) = projected_schema.index_of(partition_name) {
+                idx_map.insert(partition_idx, schema_idx);
+            }
+        }
+
+        let mut projected_partition_indexes: Vec<_> = 
idx_map.into_iter().collect();
+        projected_partition_indexes.sort_by(|(_, a), (_, b)| a.cmp(b));
+
+        Self {
+            projected_partition_indexes,
+            key_buffer_cache: Default::default(),
+            projected_schema,
+        }
+    }
+
+    // Transform the batch read from the file by inserting the partitioning 
columns
+    // to the right positions as deduced from `projected_schema`
+    // - `file_batch`: batch read from the file, with internal projection 
applied
+    // - `partition_values`: the list of partition values, one for each 
partition column
+    pub fn project(
+        &mut self,
+        file_batch: RecordBatch,
+        partition_values: &[ScalarValue],
+    ) -> Result<RecordBatch> {
+        let expected_cols =
+            self.projected_schema.fields().len() - 
self.projected_partition_indexes.len();
+
+        if file_batch.columns().len() != expected_cols {
+            return Err(DataFusionError::Execution(format!(
+                "Unexpected batch schema from file, expected {} cols but got 
{}",
+                expected_cols,
+                file_batch.columns().len()
+            )));
+        }
+        let mut cols = file_batch.columns().to_vec();
+        for &(pidx, sidx) in &self.projected_partition_indexes {
+            let mut partition_value = Cow::Borrowed(&partition_values[pidx]);
+
+            // check if user forgot to dict-encode the partition value
+            let field = self.projected_schema.field(sidx);
+            let expected_data_type = field.data_type();
+            let actual_data_type = partition_value.get_datatype();
+            if let DataType::Dictionary(key_type, _) = expected_data_type {
+                if !matches!(actual_data_type, DataType::Dictionary(_, _)) {
+                    warn!("Partition value for column {} was not 
dictionary-encoded, applied auto-fix.", field.name());
+                    partition_value = Cow::Owned(ScalarValue::Dictionary(
+                        key_type.clone(),
+                        Box::new(partition_value.as_ref().clone()),
+                    ));
+                }
+            }
+
+            cols.insert(
+                sidx,
+                create_output_array(
+                    &mut self.key_buffer_cache,
+                    partition_value.as_ref(),
+                    file_batch.num_rows(),
+                ),
+            )
+        }
+        RecordBatch::try_new(Arc::clone(&self.projected_schema), 
cols).map_err(Into::into)
+    }
+}
+
+#[derive(Debug, Default)]
+struct ZeroBufferGenerators {
+    gen_i8: ZeroBufferGenerator<i8>,
+    gen_i16: ZeroBufferGenerator<i16>,
+    gen_i32: ZeroBufferGenerator<i32>,
+    gen_i64: ZeroBufferGenerator<i64>,
+    gen_u8: ZeroBufferGenerator<u8>,
+    gen_u16: ZeroBufferGenerator<u16>,
+    gen_u32: ZeroBufferGenerator<u32>,
+    gen_u64: ZeroBufferGenerator<u64>,
+}
+
+/// Generate a arrow [`Buffer`] that contains zero values.
+#[derive(Debug, Default)]
+struct ZeroBufferGenerator<T>
+where
+    T: ArrowNativeType,
+{
+    cache: Option<Buffer>,
+    _t: PhantomData<T>,
+}
+
+impl<T> ZeroBufferGenerator<T>
+where
+    T: ArrowNativeType,
+{
+    const SIZE: usize = std::mem::size_of::<T>();
+
+    fn get_buffer(&mut self, n_vals: usize) -> Buffer {
+        match &mut self.cache {
+            Some(buf) if buf.len() >= n_vals * Self::SIZE => {
+                buf.slice_with_length(0, n_vals * Self::SIZE)
+            }
+            _ => {
+                let mut key_buffer_builder = BufferBuilder::<T>::new(n_vals);
+                key_buffer_builder.advance(n_vals); // keys are all 0
+                self.cache.insert(key_buffer_builder.finish()).clone()
+            }
+        }
+    }
+}
+
+fn create_dict_array<T>(
+    buffer_gen: &mut ZeroBufferGenerator<T>,
+    dict_val: &ScalarValue,
+    len: usize,
+    data_type: DataType,
+) -> ArrayRef
+where
+    T: ArrowNativeType,
+{
+    let dict_vals = dict_val.to_array();
+
+    let sliced_key_buffer = buffer_gen.get_buffer(len);
+
+    // assemble pieces together
+    let mut builder = ArrayData::builder(data_type)
+        .len(len)
+        .add_buffer(sliced_key_buffer);
+    builder = builder.add_child_data(dict_vals.to_data());
+    Arc::new(DictionaryArray::<UInt16Type>::from(
+        builder.build().unwrap(),
+    ))
+}
+
+fn create_output_array(
+    key_buffer_cache: &mut ZeroBufferGenerators,
+    val: &ScalarValue,
+    len: usize,
+) -> ArrayRef {
+    if let ScalarValue::Dictionary(key_type, dict_val) = &val {
+        match key_type.as_ref() {
+            DataType::Int8 => {
+                return create_dict_array(
+                    &mut key_buffer_cache.gen_i8,
+                    dict_val,
+                    len,
+                    val.get_datatype(),
+                );
+            }
+            DataType::Int16 => {
+                return create_dict_array(
+                    &mut key_buffer_cache.gen_i16,
+                    dict_val,
+                    len,
+                    val.get_datatype(),
+                );
+            }
+            DataType::Int32 => {
+                return create_dict_array(
+                    &mut key_buffer_cache.gen_i32,
+                    dict_val,
+                    len,
+                    val.get_datatype(),
+                );
+            }
+            DataType::Int64 => {
+                return create_dict_array(
+                    &mut key_buffer_cache.gen_i64,
+                    dict_val,
+                    len,
+                    val.get_datatype(),
+                );
+            }
+            DataType::UInt8 => {
+                return create_dict_array(
+                    &mut key_buffer_cache.gen_u8,
+                    dict_val,
+                    len,
+                    val.get_datatype(),
+                );
+            }
+            DataType::UInt16 => {
+                return create_dict_array(
+                    &mut key_buffer_cache.gen_u16,
+                    dict_val,
+                    len,
+                    val.get_datatype(),
+                );
+            }
+            DataType::UInt32 => {
+                return create_dict_array(
+                    &mut key_buffer_cache.gen_u32,
+                    dict_val,
+                    len,
+                    val.get_datatype(),
+                );
+            }
+            DataType::UInt64 => {
+                return create_dict_array(
+                    &mut key_buffer_cache.gen_u64,
+                    dict_val,
+                    len,
+                    val.get_datatype(),
+                );
+            }
+            _ => {}
+        }
+    }
+
+    val.to_array_of_size(len)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::{
+        test::{build_table_i32, columns},
+        test_util::aggr_test_schema,
+    };
+
+    #[test]
+    fn physical_plan_config_no_projection() {
+        let file_schema = aggr_test_schema();
+        let conf = config_for_projection(
+            Arc::clone(&file_schema),
+            None,
+            Statistics::default(),
+            vec![(
+                "date".to_owned(),
+                wrap_partition_type_in_dict(DataType::Utf8),
+            )],
+        );
+
+        let (proj_schema, proj_statistics, _) = conf.project();
+        assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
+        assert_eq!(
+            proj_schema.field(file_schema.fields().len()).name(),
+            "date",
+            "partition columns are the last columns"
+        );
+        assert_eq!(
+            proj_statistics
+                .column_statistics
+                .expect("projection creates column statistics")
+                .len(),
+            file_schema.fields().len() + 1
+        );
+        // TODO implement tests for partition column statistics once 
implemented
+
+        let col_names = conf.projected_file_column_names();
+        assert_eq!(col_names, None);
+
+        let col_indices = conf.file_column_projection_indices();
+        assert_eq!(col_indices, None);
+    }
+
+    #[test]
+    fn physical_plan_config_with_projection() {
+        let file_schema = aggr_test_schema();
+        let conf = config_for_projection(
+            Arc::clone(&file_schema),
+            Some(vec![file_schema.fields().len(), 0]),
+            Statistics {
+                num_rows: Some(10),
+                // assign the column index to distinct_count to help assert
+                // the source statistic after the projection
+                column_statistics: Some(
+                    (0..file_schema.fields().len())
+                        .map(|i| ColumnStatistics {
+                            distinct_count: Some(i),
+                            ..Default::default()
+                        })
+                        .collect(),
+                ),
+                ..Default::default()
+            },
+            vec![(
+                "date".to_owned(),
+                wrap_partition_type_in_dict(DataType::Utf8),
+            )],
+        );
+
+        let (proj_schema, proj_statistics, _) = conf.project();
+        assert_eq!(
+            columns(&proj_schema),
+            vec!["date".to_owned(), "c1".to_owned()]
+        );
+        let proj_stat_cols = proj_statistics
+            .column_statistics
+            .expect("projection creates column statistics");
+        assert_eq!(proj_stat_cols.len(), 2);
+        // TODO implement tests for proj_stat_cols[0] once partition column
+        // statistics are implemented
+        assert_eq!(proj_stat_cols[1].distinct_count, Some(0));
+
+        let col_names = conf.projected_file_column_names();
+        assert_eq!(col_names, Some(vec!["c1".to_owned()]));
+
+        let col_indices = conf.file_column_projection_indices();
+        assert_eq!(col_indices, Some(vec![0]));
+    }
+
+    #[test]
+    fn partition_column_projector() {
+        let file_batch = build_table_i32(
+            ("a", &vec![0, 1, 2]),
+            ("b", &vec![-2, -1, 0]),
+            ("c", &vec![10, 11, 12]),
+        );
+        let partition_cols = vec![
+            (
+                "year".to_owned(),
+                wrap_partition_type_in_dict(DataType::Utf8),
+            ),
+            (
+                "month".to_owned(),
+                wrap_partition_type_in_dict(DataType::Utf8),
+            ),
+            (
+                "day".to_owned(),
+                wrap_partition_type_in_dict(DataType::Utf8),
+            ),
+        ];
+        // create a projected schema
+        let conf = config_for_projection(
+            file_batch.schema(),
+            // keep all cols from file and 2 from partitioning
+            Some(vec![
+                0,
+                1,
+                2,
+                file_batch.schema().fields().len(),
+                file_batch.schema().fields().len() + 2,
+            ]),
+            Statistics::default(),
+            partition_cols.clone(),
+        );
+        let (proj_schema, ..) = conf.project();
+        // created a projector for that projected schema
+        let mut proj = PartitionColumnProjector::new(
+            proj_schema,
+            &partition_cols
+                .iter()
+                .map(|x| x.0.clone())
+                .collect::<Vec<_>>(),
+        );
+
+        // project first batch
+        let projected_batch = proj
+            .project(
+                // file_batch is ok here because we kept all the file cols in 
the projection
+                file_batch,
+                &[
+                    wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
+                        "2021".to_owned(),
+                    ))),
+                    wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
+                        "10".to_owned(),
+                    ))),
+                    wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
+                        "26".to_owned(),
+                    ))),
+                ],
+            )
+            .expect("Projection of partition columns into record batch 
failed");
+        let expected = vec![
+            "+---+----+----+------+-----+",
+            "| a | b  | c  | year | day |",
+            "+---+----+----+------+-----+",
+            "| 0 | -2 | 10 | 2021 | 26  |",
+            "| 1 | -1 | 11 | 2021 | 26  |",
+            "| 2 | 0  | 12 | 2021 | 26  |",
+            "+---+----+----+------+-----+",
+        ];
+        crate::assert_batches_eq!(expected, &[projected_batch]);
+
+        // project another batch that is larger than the previous one
+        let file_batch = build_table_i32(
+            ("a", &vec![5, 6, 7, 8, 9]),
+            ("b", &vec![-10, -9, -8, -7, -6]),
+            ("c", &vec![12, 13, 14, 15, 16]),
+        );
+        let projected_batch = proj
+            .project(
+                // file_batch is ok here because we kept all the file cols in 
the projection
+                file_batch,
+                &[
+                    wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
+                        "2021".to_owned(),
+                    ))),
+                    wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
+                        "10".to_owned(),
+                    ))),
+                    wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
+                        "27".to_owned(),
+                    ))),
+                ],
+            )
+            .expect("Projection of partition columns into record batch 
failed");
+        let expected = vec![
+            "+---+-----+----+------+-----+",
+            "| a | b   | c  | year | day |",
+            "+---+-----+----+------+-----+",
+            "| 5 | -10 | 12 | 2021 | 27  |",
+            "| 6 | -9  | 13 | 2021 | 27  |",
+            "| 7 | -8  | 14 | 2021 | 27  |",
+            "| 8 | -7  | 15 | 2021 | 27  |",
+            "| 9 | -6  | 16 | 2021 | 27  |",
+            "+---+-----+----+------+-----+",
+        ];
+        crate::assert_batches_eq!(expected, &[projected_batch]);
+
+        // project another batch that is smaller than the previous one
+        let file_batch = build_table_i32(
+            ("a", &vec![0, 1, 3]),
+            ("b", &vec![2, 3, 4]),
+            ("c", &vec![4, 5, 6]),
+        );
+        let projected_batch = proj
+            .project(
+                // file_batch is ok here because we kept all the file cols in 
the projection
+                file_batch,
+                &[
+                    wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
+                        "2021".to_owned(),
+                    ))),
+                    wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
+                        "10".to_owned(),
+                    ))),
+                    wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
+                        "28".to_owned(),
+                    ))),
+                ],
+            )
+            .expect("Projection of partition columns into record batch 
failed");
+        let expected = vec![
+            "+---+---+---+------+-----+",
+            "| a | b | c | year | day |",
+            "+---+---+---+------+-----+",
+            "| 0 | 2 | 4 | 2021 | 28  |",
+            "| 1 | 3 | 5 | 2021 | 28  |",
+            "| 3 | 4 | 6 | 2021 | 28  |",
+            "+---+---+---+------+-----+",
+        ];
+        crate::assert_batches_eq!(expected, &[projected_batch]);
+
+        // forgot to dictionary-wrap the scalar value
+        let file_batch = build_table_i32(
+            ("a", &vec![0, 1, 2]),
+            ("b", &vec![-2, -1, 0]),
+            ("c", &vec![10, 11, 12]),
+        );
+        let projected_batch = proj
+            .project(
+                // file_batch is ok here because we kept all the file cols in 
the projection
+                file_batch,
+                &[
+                    ScalarValue::Utf8(Some("2021".to_owned())),
+                    ScalarValue::Utf8(Some("10".to_owned())),
+                    ScalarValue::Utf8(Some("26".to_owned())),
+                ],
+            )
+            .expect("Projection of partition columns into record batch 
failed");
+        let expected = vec![
+            "+---+----+----+------+-----+",
+            "| a | b  | c  | year | day |",
+            "+---+----+----+------+-----+",
+            "| 0 | -2 | 10 | 2021 | 26  |",
+            "| 1 | -1 | 11 | 2021 | 26  |",
+            "| 2 | 0  | 12 | 2021 | 26  |",
+            "+---+----+----+------+-----+",
+        ];
+        crate::assert_batches_eq!(expected, &[projected_batch]);
+    }
+
+    // sets default for configs that play no role in projections
+    fn config_for_projection(
+        file_schema: SchemaRef,
+        projection: Option<Vec<usize>>,
+        statistics: Statistics,
+        table_partition_cols: Vec<(String, DataType)>,
+    ) -> FileScanConfig {
+        FileScanConfig {
+            file_schema,
+            file_groups: vec![vec![]],
+            limit: None,
+            object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
+            projection,
+            statistics,
+            table_partition_cols,
+            output_ordering: vec![],
+            infinite_source: false,
+        }
+    }
+}
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs 
b/datafusion/core/src/datasource/physical_plan/mod.rs
index 06c16ad751..ecaf71ff54 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -31,20 +31,25 @@ pub use self::csv::{CsvConfig, CsvExec, CsvOpener};
 pub(crate) use self::parquet::plan_to_parquet;
 pub use self::parquet::{ParquetExec, ParquetFileMetrics, 
ParquetFileReaderFactory};
 use arrow::{
-    array::{new_null_array, ArrayData, ArrayRef, BufferBuilder, 
DictionaryArray},
-    buffer::Buffer,
+    array::new_null_array,
     compute::can_cast_types,
-    datatypes::{ArrowNativeType, DataType, Field, Schema, SchemaRef, 
UInt16Type},
+    datatypes::{DataType, Schema, SchemaRef},
     record_batch::{RecordBatch, RecordBatchOptions},
 };
 pub use arrow_file::ArrowExec;
 pub use avro::AvroExec;
-use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
+use datafusion_physical_expr::PhysicalSortExpr;
 pub use file_stream::{FileOpenFuture, FileOpener, FileStream, OnError};
 pub(crate) use json::plan_to_json;
 pub use json::{JsonOpener, NdJsonExec};
+mod file_scan_config;
+pub(crate) use file_scan_config::PartitionColumnProjector;
+pub use file_scan_config::{
+    get_scan_files, wrap_partition_type_in_dict, wrap_partition_value_in_dict,
+    FileScanConfig,
+};
 
-use crate::physical_plan::ExecutionPlan;
+use crate::error::{DataFusionError, Result};
 use crate::{
     datasource::file_format::write::FileWriterMode,
     physical_plan::{DisplayAs, DisplayFormatType},
@@ -56,264 +61,21 @@ use crate::{
     },
     physical_plan::display::{OutputOrderingDisplay, ProjectSchemaDisplay},
 };
-use crate::{
-    error::{DataFusionError, Result},
-    scalar::ScalarValue,
-};
 
-use datafusion_common::{
-    plan_err,
-    tree_node::{TreeNode, VisitRecursion},
-};
+use datafusion_common::plan_err;
 use datafusion_physical_expr::expressions::Column;
 
 use arrow::compute::cast;
-use itertools::Itertools;
-use log::{debug, warn};
+use log::debug;
 use object_store::path::Path;
 use object_store::ObjectMeta;
 use std::{
-    borrow::Cow,
-    cmp::min,
-    collections::HashMap,
     fmt::{Debug, Formatter, Result as FmtResult},
-    marker::PhantomData,
     sync::Arc,
     vec,
 };
 
-use super::{listing::ListingTableUrl, ColumnStatistics, Statistics};
-
-/// Convert type to a type suitable for use as a [`ListingTable`]
-/// partition column. Returns `Dictionary(UInt16, val_type)`, which is
-/// a reasonable trade off between a reasonable number of partition
-/// values and space efficiency.
-///
-/// This use this to specify types for partition columns. However
-/// you MAY also choose not to dictionary-encode the data or to use a
-/// different dictionary type.
-///
-/// Use [`wrap_partition_value_in_dict`] to wrap a [`ScalarValue`] in the same 
say.
-///
-/// [`ListingTable`]: crate::datasource::listing::ListingTable
-pub fn wrap_partition_type_in_dict(val_type: DataType) -> DataType {
-    DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type))
-}
-
-/// Convert a [`ScalarValue`] of partition columns to a type, as
-/// decribed in the documentation of [`wrap_partition_type_in_dict`],
-/// which can wrap the types.
-pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {
-    ScalarValue::Dictionary(Box::new(DataType::UInt16), Box::new(val))
-}
-
-/// Get all of the [`PartitionedFile`] to be scanned for an [`ExecutionPlan`]
-pub fn get_scan_files(
-    plan: Arc<dyn ExecutionPlan>,
-) -> Result<Vec<Vec<Vec<PartitionedFile>>>> {
-    let mut collector: Vec<Vec<Vec<PartitionedFile>>> = vec![];
-    plan.apply(&mut |plan| {
-        if let Some(file_scan_config) = plan.file_scan_config() {
-            collector.push(file_scan_config.file_groups.clone());
-            Ok(VisitRecursion::Skip)
-        } else {
-            Ok(VisitRecursion::Continue)
-        }
-    })?;
-    Ok(collector)
-}
-
-/// The base configurations to provide when creating a physical plan for
-/// any given file format.
-#[derive(Clone)]
-pub struct FileScanConfig {
-    /// Object store URL, used to get an [`ObjectStore`] instance from
-    /// [`RuntimeEnv::object_store`]
-    ///
-    /// [`ObjectStore`]: object_store::ObjectStore
-    /// [`RuntimeEnv::object_store`]: 
datafusion_execution::runtime_env::RuntimeEnv::object_store
-    pub object_store_url: ObjectStoreUrl,
-    /// Schema before `projection` is applied. It contains the all columns 
that may
-    /// appear in the files. It does not include table partition columns
-    /// that may be added.
-    pub file_schema: SchemaRef,
-    /// List of files to be processed, grouped into partitions
-    ///
-    /// Each file must have a schema of `file_schema` or a subset. If
-    /// a particular file has a subset, the missing columns are
-    /// padded with NULLs.
-    ///
-    /// DataFusion may attempt to read each partition of files
-    /// concurrently, however files *within* a partition will be read
-    /// sequentially, one after the next.
-    pub file_groups: Vec<Vec<PartitionedFile>>,
-    /// Estimated overall statistics of the files, taking `filters` into 
account.
-    pub statistics: Statistics,
-    /// Columns on which to project the data. Indexes that are higher than the
-    /// number of columns of `file_schema` refer to `table_partition_cols`.
-    pub projection: Option<Vec<usize>>,
-    /// The maximum number of records to read from this plan. If `None`,
-    /// all records after filtering are returned.
-    pub limit: Option<usize>,
-    /// The partitioning columns
-    pub table_partition_cols: Vec<(String, DataType)>,
-    /// All equivalent lexicographical orderings that describe the schema.
-    pub output_ordering: Vec<LexOrdering>,
-    /// Indicates whether this plan may produce an infinite stream of records.
-    pub infinite_source: bool,
-}
-
-impl FileScanConfig {
-    /// Project the schema and the statistics on the given column indices
-    pub fn project(&self) -> (SchemaRef, Statistics, Vec<LexOrdering>) {
-        if self.projection.is_none() && self.table_partition_cols.is_empty() {
-            return (
-                Arc::clone(&self.file_schema),
-                self.statistics.clone(),
-                self.output_ordering.clone(),
-            );
-        }
-
-        let proj_iter: Box<dyn Iterator<Item = usize>> = match 
&self.projection {
-            Some(proj) => Box::new(proj.iter().copied()),
-            None => Box::new(
-                0..(self.file_schema.fields().len() + 
self.table_partition_cols.len()),
-            ),
-        };
-
-        let mut table_fields = vec![];
-        let mut table_cols_stats = vec![];
-        for idx in proj_iter {
-            if idx < self.file_schema.fields().len() {
-                table_fields.push(self.file_schema.field(idx).clone());
-                if let Some(file_cols_stats) = 
&self.statistics.column_statistics {
-                    table_cols_stats.push(file_cols_stats[idx].clone())
-                } else {
-                    table_cols_stats.push(ColumnStatistics::default())
-                }
-            } else {
-                let partition_idx = idx - self.file_schema.fields().len();
-                table_fields.push(Field::new(
-                    &self.table_partition_cols[partition_idx].0,
-                    self.table_partition_cols[partition_idx].1.to_owned(),
-                    false,
-                ));
-                // TODO provide accurate stat for partition column (#1186)
-                table_cols_stats.push(ColumnStatistics::default())
-            }
-        }
-
-        let table_stats = Statistics {
-            num_rows: self.statistics.num_rows,
-            is_exact: self.statistics.is_exact,
-            // TODO correct byte size?
-            total_byte_size: None,
-            column_statistics: Some(table_cols_stats),
-        };
-
-        let table_schema = Arc::new(
-            
Schema::new(table_fields).with_metadata(self.file_schema.metadata().clone()),
-        );
-        let projected_output_ordering =
-            get_projected_output_ordering(self, &table_schema);
-        (table_schema, table_stats, projected_output_ordering)
-    }
-
-    #[allow(unused)] // Only used by avro
-    fn projected_file_column_names(&self) -> Option<Vec<String>> {
-        self.projection.as_ref().map(|p| {
-            p.iter()
-                .filter(|col_idx| **col_idx < self.file_schema.fields().len())
-                .map(|col_idx| self.file_schema.field(*col_idx).name())
-                .cloned()
-                .collect()
-        })
-    }
-
-    fn file_column_projection_indices(&self) -> Option<Vec<usize>> {
-        self.projection.as_ref().map(|p| {
-            p.iter()
-                .filter(|col_idx| **col_idx < self.file_schema.fields().len())
-                .copied()
-                .collect()
-        })
-    }
-
-    /// Repartition all input files into `target_partitions` partitions, if 
total file size exceed
-    /// `repartition_file_min_size`
-    /// `target_partitions` and `repartition_file_min_size` directly come from 
configuration.
-    ///
-    /// This function only try to partition file byte range evenly, and let 
specific `FileOpener` to
-    /// do actual partition on specific data source type. (e.g. `CsvOpener` 
will only read lines
-    /// overlap with byte range but also handle boundaries to ensure all lines 
will be read exactly once)
-    pub fn repartition_file_groups(
-        file_groups: Vec<Vec<PartitionedFile>>,
-        target_partitions: usize,
-        repartition_file_min_size: usize,
-    ) -> Option<Vec<Vec<PartitionedFile>>> {
-        let flattened_files = file_groups.iter().flatten().collect::<Vec<_>>();
-
-        // Perform redistribution only in case all files should be read from 
beginning to end
-        let has_ranges = flattened_files.iter().any(|f| f.range.is_some());
-        if has_ranges {
-            return None;
-        }
-
-        let total_size = flattened_files
-            .iter()
-            .map(|f| f.object_meta.size as i64)
-            .sum::<i64>();
-        if total_size < (repartition_file_min_size as i64) || total_size == 0 {
-            return None;
-        }
-
-        let target_partition_size =
-            (total_size as usize + (target_partitions) - 1) / 
(target_partitions);
-
-        let current_partition_index: usize = 0;
-        let current_partition_size: usize = 0;
-
-        // Partition byte range evenly for all `PartitionedFile`s
-        let repartitioned_files = flattened_files
-            .into_iter()
-            .scan(
-                (current_partition_index, current_partition_size),
-                |state, source_file| {
-                    let mut produced_files = vec![];
-                    let mut range_start = 0;
-                    while range_start < source_file.object_meta.size {
-                        let range_end = min(
-                            range_start + (target_partition_size - state.1),
-                            source_file.object_meta.size,
-                        );
-
-                        let mut produced_file = source_file.clone();
-                        produced_file.range = Some(FileRange {
-                            start: range_start as i64,
-                            end: range_end as i64,
-                        });
-                        produced_files.push((state.0, produced_file));
-
-                        if state.1 + (range_end - range_start) >= 
target_partition_size {
-                            state.0 += 1;
-                            state.1 = 0;
-                        } else {
-                            state.1 += range_end - range_start;
-                        }
-                        range_start = range_end;
-                    }
-                    Some(produced_files)
-                },
-            )
-            .flatten()
-            .group_by(|(partition_idx, _)| *partition_idx)
-            .into_iter()
-            .map(|(_, group)| group.map(|(_, vals)| vals).collect_vec())
-            .collect_vec();
-
-        Some(repartitioned_files)
-    }
-}
+use super::listing::ListingTableUrl;
 
 /// The base configurations to provide when creating a physical plan for
 /// writing to any given file format.
@@ -615,240 +377,6 @@ impl SchemaMapping {
     }
 }
 
-/// A helper that projects partition columns into the file record batches.
-///
-/// One interesting trick is the usage of a cache for the key buffers of the 
partition column
-/// dictionaries. Indeed, the partition columns are constant, so the 
dictionaries that represent them
-/// have all their keys equal to 0. This enables us to re-use the same 
"all-zero" buffer across batches,
-/// which makes the space consumption of the partition columns O(batch_size) 
instead of O(record_count).
-struct PartitionColumnProjector {
-    /// An Arrow buffer initialized to zeros that represents the key array of 
all partition
-    /// columns (partition columns are materialized by dictionary arrays with 
only one
-    /// value in the dictionary, thus all the keys are equal to zero).
-    key_buffer_cache: ZeroBufferGenerators,
-    /// Mapping between the indexes in the list of partition columns and the 
target
-    /// schema. Sorted by index in the target schema so that we can iterate on 
it to
-    /// insert the partition columns in the target record batch.
-    projected_partition_indexes: Vec<(usize, usize)>,
-    /// The schema of the table once the projection was applied.
-    projected_schema: SchemaRef,
-}
-
-impl PartitionColumnProjector {
-    // Create a projector to insert the partitioning columns into batches read 
from files
-    // - `projected_schema`: the target schema with both file and partitioning 
columns
-    // - `table_partition_cols`: all the partitioning column names
-    fn new(projected_schema: SchemaRef, table_partition_cols: &[String]) -> 
Self {
-        let mut idx_map = HashMap::new();
-        for (partition_idx, partition_name) in 
table_partition_cols.iter().enumerate() {
-            if let Ok(schema_idx) = projected_schema.index_of(partition_name) {
-                idx_map.insert(partition_idx, schema_idx);
-            }
-        }
-
-        let mut projected_partition_indexes: Vec<_> = 
idx_map.into_iter().collect();
-        projected_partition_indexes.sort_by(|(_, a), (_, b)| a.cmp(b));
-
-        Self {
-            projected_partition_indexes,
-            key_buffer_cache: Default::default(),
-            projected_schema,
-        }
-    }
-
-    // Transform the batch read from the file by inserting the partitioning 
columns
-    // to the right positions as deduced from `projected_schema`
-    // - `file_batch`: batch read from the file, with internal projection 
applied
-    // - `partition_values`: the list of partition values, one for each 
partition column
-    fn project(
-        &mut self,
-        file_batch: RecordBatch,
-        partition_values: &[ScalarValue],
-    ) -> Result<RecordBatch> {
-        let expected_cols =
-            self.projected_schema.fields().len() - 
self.projected_partition_indexes.len();
-
-        if file_batch.columns().len() != expected_cols {
-            return Err(DataFusionError::Execution(format!(
-                "Unexpected batch schema from file, expected {} cols but got 
{}",
-                expected_cols,
-                file_batch.columns().len()
-            )));
-        }
-        let mut cols = file_batch.columns().to_vec();
-        for &(pidx, sidx) in &self.projected_partition_indexes {
-            let mut partition_value = Cow::Borrowed(&partition_values[pidx]);
-
-            // check if user forgot to dict-encode the partition value
-            let field = self.projected_schema.field(sidx);
-            let expected_data_type = field.data_type();
-            let actual_data_type = partition_value.get_datatype();
-            if let DataType::Dictionary(key_type, _) = expected_data_type {
-                if !matches!(actual_data_type, DataType::Dictionary(_, _)) {
-                    warn!("Partition value for column {} was not 
dictionary-encoded, applied auto-fix.", field.name());
-                    partition_value = Cow::Owned(ScalarValue::Dictionary(
-                        key_type.clone(),
-                        Box::new(partition_value.as_ref().clone()),
-                    ));
-                }
-            }
-
-            cols.insert(
-                sidx,
-                create_output_array(
-                    &mut self.key_buffer_cache,
-                    partition_value.as_ref(),
-                    file_batch.num_rows(),
-                ),
-            )
-        }
-        RecordBatch::try_new(Arc::clone(&self.projected_schema), 
cols).map_err(Into::into)
-    }
-}
-
-#[derive(Debug, Default)]
-struct ZeroBufferGenerators {
-    gen_i8: ZeroBufferGenerator<i8>,
-    gen_i16: ZeroBufferGenerator<i16>,
-    gen_i32: ZeroBufferGenerator<i32>,
-    gen_i64: ZeroBufferGenerator<i64>,
-    gen_u8: ZeroBufferGenerator<u8>,
-    gen_u16: ZeroBufferGenerator<u16>,
-    gen_u32: ZeroBufferGenerator<u32>,
-    gen_u64: ZeroBufferGenerator<u64>,
-}
-
-/// Generate a arrow [`Buffer`] that contains zero values.
-#[derive(Debug, Default)]
-struct ZeroBufferGenerator<T>
-where
-    T: ArrowNativeType,
-{
-    cache: Option<Buffer>,
-    _t: PhantomData<T>,
-}
-
-impl<T> ZeroBufferGenerator<T>
-where
-    T: ArrowNativeType,
-{
-    const SIZE: usize = std::mem::size_of::<T>();
-
-    fn get_buffer(&mut self, n_vals: usize) -> Buffer {
-        match &mut self.cache {
-            Some(buf) if buf.len() >= n_vals * Self::SIZE => {
-                buf.slice_with_length(0, n_vals * Self::SIZE)
-            }
-            _ => {
-                let mut key_buffer_builder = BufferBuilder::<T>::new(n_vals);
-                key_buffer_builder.advance(n_vals); // keys are all 0
-                self.cache.insert(key_buffer_builder.finish()).clone()
-            }
-        }
-    }
-}
-
-fn create_dict_array<T>(
-    buffer_gen: &mut ZeroBufferGenerator<T>,
-    dict_val: &ScalarValue,
-    len: usize,
-    data_type: DataType,
-) -> ArrayRef
-where
-    T: ArrowNativeType,
-{
-    let dict_vals = dict_val.to_array();
-
-    let sliced_key_buffer = buffer_gen.get_buffer(len);
-
-    // assemble pieces together
-    let mut builder = ArrayData::builder(data_type)
-        .len(len)
-        .add_buffer(sliced_key_buffer);
-    builder = builder.add_child_data(dict_vals.to_data());
-    Arc::new(DictionaryArray::<UInt16Type>::from(
-        builder.build().unwrap(),
-    ))
-}
-
-fn create_output_array(
-    key_buffer_cache: &mut ZeroBufferGenerators,
-    val: &ScalarValue,
-    len: usize,
-) -> ArrayRef {
-    if let ScalarValue::Dictionary(key_type, dict_val) = &val {
-        match key_type.as_ref() {
-            DataType::Int8 => {
-                return create_dict_array(
-                    &mut key_buffer_cache.gen_i8,
-                    dict_val,
-                    len,
-                    val.get_datatype(),
-                );
-            }
-            DataType::Int16 => {
-                return create_dict_array(
-                    &mut key_buffer_cache.gen_i16,
-                    dict_val,
-                    len,
-                    val.get_datatype(),
-                );
-            }
-            DataType::Int32 => {
-                return create_dict_array(
-                    &mut key_buffer_cache.gen_i32,
-                    dict_val,
-                    len,
-                    val.get_datatype(),
-                );
-            }
-            DataType::Int64 => {
-                return create_dict_array(
-                    &mut key_buffer_cache.gen_i64,
-                    dict_val,
-                    len,
-                    val.get_datatype(),
-                );
-            }
-            DataType::UInt8 => {
-                return create_dict_array(
-                    &mut key_buffer_cache.gen_u8,
-                    dict_val,
-                    len,
-                    val.get_datatype(),
-                );
-            }
-            DataType::UInt16 => {
-                return create_dict_array(
-                    &mut key_buffer_cache.gen_u16,
-                    dict_val,
-                    len,
-                    val.get_datatype(),
-                );
-            }
-            DataType::UInt32 => {
-                return create_dict_array(
-                    &mut key_buffer_cache.gen_u32,
-                    dict_val,
-                    len,
-                    val.get_datatype(),
-                );
-            }
-            DataType::UInt64 => {
-                return create_dict_array(
-                    &mut key_buffer_cache.gen_u64,
-                    dict_val,
-                    len,
-                    val.get_datatype(),
-                );
-            }
-            _ => {}
-        }
-    }
-
-    val.to_array_of_size(len)
-}
-
 /// A single file or part of a file that should be read, along with its 
schema, statistics
 pub struct FileMeta {
     /// Path for the file (e.g. URL, filesystem path, etc)
@@ -980,271 +508,13 @@ mod tests {
         BinaryArray, BooleanArray, Float32Array, Int32Array, Int64Array, 
StringArray,
         UInt64Array,
     };
+    use arrow_schema::Field;
     use chrono::Utc;
 
     use crate::physical_plan::{DefaultDisplay, VerboseDisplay};
-    use crate::{
-        test::{build_table_i32, columns},
-        test_util::aggr_test_schema,
-    };
 
     use super::*;
 
-    #[test]
-    fn physical_plan_config_no_projection() {
-        let file_schema = aggr_test_schema();
-        let conf = config_for_projection(
-            Arc::clone(&file_schema),
-            None,
-            Statistics::default(),
-            vec![(
-                "date".to_owned(),
-                wrap_partition_type_in_dict(DataType::Utf8),
-            )],
-        );
-
-        let (proj_schema, proj_statistics, _) = conf.project();
-        assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
-        assert_eq!(
-            proj_schema.field(file_schema.fields().len()).name(),
-            "date",
-            "partition columns are the last columns"
-        );
-        assert_eq!(
-            proj_statistics
-                .column_statistics
-                .expect("projection creates column statistics")
-                .len(),
-            file_schema.fields().len() + 1
-        );
-        // TODO implement tests for partition column statistics once 
implemented
-
-        let col_names = conf.projected_file_column_names();
-        assert_eq!(col_names, None);
-
-        let col_indices = conf.file_column_projection_indices();
-        assert_eq!(col_indices, None);
-    }
-
-    #[test]
-    fn physical_plan_config_with_projection() {
-        let file_schema = aggr_test_schema();
-        let conf = config_for_projection(
-            Arc::clone(&file_schema),
-            Some(vec![file_schema.fields().len(), 0]),
-            Statistics {
-                num_rows: Some(10),
-                // assign the column index to distinct_count to help assert
-                // the source statistic after the projection
-                column_statistics: Some(
-                    (0..file_schema.fields().len())
-                        .map(|i| ColumnStatistics {
-                            distinct_count: Some(i),
-                            ..Default::default()
-                        })
-                        .collect(),
-                ),
-                ..Default::default()
-            },
-            vec![(
-                "date".to_owned(),
-                wrap_partition_type_in_dict(DataType::Utf8),
-            )],
-        );
-
-        let (proj_schema, proj_statistics, _) = conf.project();
-        assert_eq!(
-            columns(&proj_schema),
-            vec!["date".to_owned(), "c1".to_owned()]
-        );
-        let proj_stat_cols = proj_statistics
-            .column_statistics
-            .expect("projection creates column statistics");
-        assert_eq!(proj_stat_cols.len(), 2);
-        // TODO implement tests for proj_stat_cols[0] once partition column
-        // statistics are implemented
-        assert_eq!(proj_stat_cols[1].distinct_count, Some(0));
-
-        let col_names = conf.projected_file_column_names();
-        assert_eq!(col_names, Some(vec!["c1".to_owned()]));
-
-        let col_indices = conf.file_column_projection_indices();
-        assert_eq!(col_indices, Some(vec![0]));
-    }
-
-    #[test]
-    fn partition_column_projector() {
-        let file_batch = build_table_i32(
-            ("a", &vec![0, 1, 2]),
-            ("b", &vec![-2, -1, 0]),
-            ("c", &vec![10, 11, 12]),
-        );
-        let partition_cols = vec![
-            (
-                "year".to_owned(),
-                wrap_partition_type_in_dict(DataType::Utf8),
-            ),
-            (
-                "month".to_owned(),
-                wrap_partition_type_in_dict(DataType::Utf8),
-            ),
-            (
-                "day".to_owned(),
-                wrap_partition_type_in_dict(DataType::Utf8),
-            ),
-        ];
-        // create a projected schema
-        let conf = config_for_projection(
-            file_batch.schema(),
-            // keep all cols from file and 2 from partitioning
-            Some(vec![
-                0,
-                1,
-                2,
-                file_batch.schema().fields().len(),
-                file_batch.schema().fields().len() + 2,
-            ]),
-            Statistics::default(),
-            partition_cols.clone(),
-        );
-        let (proj_schema, ..) = conf.project();
-        // created a projector for that projected schema
-        let mut proj = PartitionColumnProjector::new(
-            proj_schema,
-            &partition_cols
-                .iter()
-                .map(|x| x.0.clone())
-                .collect::<Vec<_>>(),
-        );
-
-        // project first batch
-        let projected_batch = proj
-            .project(
-                // file_batch is ok here because we kept all the file cols in 
the projection
-                file_batch,
-                &[
-                    wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
-                        "2021".to_owned(),
-                    ))),
-                    wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
-                        "10".to_owned(),
-                    ))),
-                    wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
-                        "26".to_owned(),
-                    ))),
-                ],
-            )
-            .expect("Projection of partition columns into record batch 
failed");
-        let expected = vec![
-            "+---+----+----+------+-----+",
-            "| a | b  | c  | year | day |",
-            "+---+----+----+------+-----+",
-            "| 0 | -2 | 10 | 2021 | 26  |",
-            "| 1 | -1 | 11 | 2021 | 26  |",
-            "| 2 | 0  | 12 | 2021 | 26  |",
-            "+---+----+----+------+-----+",
-        ];
-        crate::assert_batches_eq!(expected, &[projected_batch]);
-
-        // project another batch that is larger than the previous one
-        let file_batch = build_table_i32(
-            ("a", &vec![5, 6, 7, 8, 9]),
-            ("b", &vec![-10, -9, -8, -7, -6]),
-            ("c", &vec![12, 13, 14, 15, 16]),
-        );
-        let projected_batch = proj
-            .project(
-                // file_batch is ok here because we kept all the file cols in 
the projection
-                file_batch,
-                &[
-                    wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
-                        "2021".to_owned(),
-                    ))),
-                    wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
-                        "10".to_owned(),
-                    ))),
-                    wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
-                        "27".to_owned(),
-                    ))),
-                ],
-            )
-            .expect("Projection of partition columns into record batch 
failed");
-        let expected = vec![
-            "+---+-----+----+------+-----+",
-            "| a | b   | c  | year | day |",
-            "+---+-----+----+------+-----+",
-            "| 5 | -10 | 12 | 2021 | 27  |",
-            "| 6 | -9  | 13 | 2021 | 27  |",
-            "| 7 | -8  | 14 | 2021 | 27  |",
-            "| 8 | -7  | 15 | 2021 | 27  |",
-            "| 9 | -6  | 16 | 2021 | 27  |",
-            "+---+-----+----+------+-----+",
-        ];
-        crate::assert_batches_eq!(expected, &[projected_batch]);
-
-        // project another batch that is smaller than the previous one
-        let file_batch = build_table_i32(
-            ("a", &vec![0, 1, 3]),
-            ("b", &vec![2, 3, 4]),
-            ("c", &vec![4, 5, 6]),
-        );
-        let projected_batch = proj
-            .project(
-                // file_batch is ok here because we kept all the file cols in 
the projection
-                file_batch,
-                &[
-                    wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
-                        "2021".to_owned(),
-                    ))),
-                    wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
-                        "10".to_owned(),
-                    ))),
-                    wrap_partition_value_in_dict(ScalarValue::Utf8(Some(
-                        "28".to_owned(),
-                    ))),
-                ],
-            )
-            .expect("Projection of partition columns into record batch 
failed");
-        let expected = vec![
-            "+---+---+---+------+-----+",
-            "| a | b | c | year | day |",
-            "+---+---+---+------+-----+",
-            "| 0 | 2 | 4 | 2021 | 28  |",
-            "| 1 | 3 | 5 | 2021 | 28  |",
-            "| 3 | 4 | 6 | 2021 | 28  |",
-            "+---+---+---+------+-----+",
-        ];
-        crate::assert_batches_eq!(expected, &[projected_batch]);
-
-        // forgot to dictionary-wrap the scalar value
-        let file_batch = build_table_i32(
-            ("a", &vec![0, 1, 2]),
-            ("b", &vec![-2, -1, 0]),
-            ("c", &vec![10, 11, 12]),
-        );
-        let projected_batch = proj
-            .project(
-                // file_batch is ok here because we kept all the file cols in 
the projection
-                file_batch,
-                &[
-                    ScalarValue::Utf8(Some("2021".to_owned())),
-                    ScalarValue::Utf8(Some("10".to_owned())),
-                    ScalarValue::Utf8(Some("26".to_owned())),
-                ],
-            )
-            .expect("Projection of partition columns into record batch 
failed");
-        let expected = vec![
-            "+---+----+----+------+-----+",
-            "| a | b  | c  | year | day |",
-            "+---+----+----+------+-----+",
-            "| 0 | -2 | 10 | 2021 | 26  |",
-            "| 1 | -1 | 11 | 2021 | 26  |",
-            "| 2 | 0  | 12 | 2021 | 26  |",
-            "+---+----+----+------+-----+",
-        ];
-        crate::assert_batches_eq!(expected, &[projected_batch]);
-    }
-
     #[test]
     fn schema_mapping_map_batch() {
         let table_schema = Arc::new(Schema::new(vec![
@@ -1361,26 +631,6 @@ mod tests {
         assert_eq!(c4.value(2), 3.0_f32);
     }
 
-    // sets default for configs that play no role in projections
-    fn config_for_projection(
-        file_schema: SchemaRef,
-        projection: Option<Vec<usize>>,
-        statistics: Statistics,
-        table_partition_cols: Vec<(String, DataType)>,
-    ) -> FileScanConfig {
-        FileScanConfig {
-            file_schema,
-            file_groups: vec![vec![]],
-            limit: None,
-            object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
-            projection,
-            statistics,
-            table_partition_cols,
-            output_ordering: vec![],
-            infinite_source: false,
-        }
-    }
-
     #[test]
     fn file_groups_display_empty() {
         let expected = "{0 groups: []}";
@@ -1533,6 +783,9 @@ mod tests {
 
     /// Unit tests for `repartition_file_groups()`
     mod repartition_file_groups_test {
+        use datafusion_common::Statistics;
+        use itertools::Itertools;
+
         use super::*;
 
         /// Empty file won't get partitioned

Reply via email to