This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 212f4245c9 Move `FileFormat` and related pieces to 
`datafusion-datasource` (#14873)
212f4245c9 is described below

commit 212f4245c98fb60929bd544545432ae9235e5b35
Author: Adam Gutglick <[email protected]>
AuthorDate: Wed Feb 26 11:56:24 2025 +0000

    Move `FileFormat` and related pieces to `datafusion-datasource` (#14873)
    
    * initial work
    
    * Trigger Build
    
    * restore pub crate
    
    * move things into parquet
    
    * Fix docs references
---
 datafusion/core/src/datasource/file_format/json.rs |   3 +-
 datafusion/core/src/datasource/file_format/mod.rs  | 394 ++-------------------
 .../core/src/datasource/file_format/parquet.rs     | 154 +++++++-
 .../src/datasource/physical_plan/parquet/opener.rs |   2 +-
 datafusion/datasource/src/file_format.rs           | 227 ++++++++++++
 datafusion/datasource/src/mod.rs                   |   1 +
 6 files changed, 405 insertions(+), 376 deletions(-)

diff --git a/datafusion/core/src/datasource/file_format/json.rs 
b/datafusion/core/src/datasource/file_format/json.rs
index 7a5aaf7c64..1a2aaf3af8 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -26,7 +26,7 @@ use std::sync::Arc;
 
 use super::write::orchestration::spawn_writer_tasks_and_join;
 use super::{
-    Decoder, DecoderDeserializer, FileFormat, FileFormatFactory, 
FileScanConfig,
+    Decoder, DecoderDeserializer, FileFormat, FileFormatFactory,
     DEFAULT_SCHEMA_INFER_MAX_RECORD,
 };
 use crate::datasource::file_format::file_compression_type::FileCompressionType;
@@ -52,6 +52,7 @@ use datafusion_common::{not_impl_err, GetExt, 
DEFAULT_JSON_EXTENSION};
 use datafusion_common_runtime::SpawnedTask;
 use datafusion_datasource::display::FileGroupDisplay;
 use datafusion_datasource::file::FileSource;
+use datafusion_datasource::file_scan_config::FileScanConfig;
 use datafusion_execution::TaskContext;
 use datafusion_expr::dml::InsertOp;
 use datafusion_physical_expr::PhysicalExpr;
diff --git a/datafusion/core/src/datasource/file_format/mod.rs 
b/datafusion/core/src/datasource/file_format/mod.rs
index 2b46748d0a..5dbf4957a4 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -18,9 +18,6 @@
 //! Module containing helper methods for the various file formats
 //! See write.rs for write related helper methods
 
-/// Default max records to scan to infer the schema
-pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000;
-
 pub mod arrow;
 pub mod avro;
 pub mod csv;
@@ -28,154 +25,21 @@ pub mod json;
 pub mod options;
 #[cfg(feature = "parquet")]
 pub mod parquet;
-use datafusion_datasource::file::FileSource;
+
+use ::arrow::array::RecordBatch;
+use arrow_schema::ArrowError;
+use bytes::Buf;
+use bytes::Bytes;
+use datafusion_common::Result;
 pub use datafusion_datasource::file_compression_type;
-use datafusion_datasource::file_scan_config::FileScanConfig;
+pub use datafusion_datasource::file_format::*;
 pub use datafusion_datasource::write;
-
-use std::any::Any;
-use std::collections::{HashMap, VecDeque};
-use std::fmt::{self, Debug, Display};
-use std::sync::Arc;
-use std::task::Poll;
-
-use crate::arrow::array::RecordBatch;
-use crate::arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
-use crate::arrow::error::ArrowError;
-use crate::datasource::physical_plan::FileSinkConfig;
-use crate::error::Result;
-use crate::physical_plan::{ExecutionPlan, Statistics};
-
-use datafusion_catalog::Session;
-use datafusion_common::file_options::file_type::FileType;
-use datafusion_common::{internal_err, not_impl_err, GetExt};
-use datafusion_expr::Expr;
-use datafusion_physical_expr::PhysicalExpr;
-
-use async_trait::async_trait;
-use bytes::{Buf, Bytes};
-use datafusion_physical_expr_common::sort_expr::LexRequirement;
-use file_compression_type::FileCompressionType;
 use futures::stream::BoxStream;
-use futures::{ready, Stream, StreamExt};
-use object_store::{ObjectMeta, ObjectStore};
-
-/// Factory for creating [`FileFormat`] instances based on session and command 
level options
-///
-/// Users can provide their own `FileFormatFactory` to support arbitrary file 
formats
-pub trait FileFormatFactory: Sync + Send + GetExt + Debug {
-    /// Initialize a [FileFormat] and configure based on session and command 
level options
-    fn create(
-        &self,
-        state: &dyn Session,
-        format_options: &HashMap<String, String>,
-    ) -> Result<Arc<dyn FileFormat>>;
-
-    /// Initialize a [FileFormat] with all options set to default values
-    fn default(&self) -> Arc<dyn FileFormat>;
-
-    /// Returns the table source as [`Any`] so that it can be
-    /// downcast to a specific implementation.
-    fn as_any(&self) -> &dyn Any;
-}
-
-/// This trait abstracts all the file format specific implementations
-/// from the [`TableProvider`]. This helps code re-utilization across
-/// providers that support the same file formats.
-///
-/// [`TableProvider`]: crate::catalog::TableProvider
-#[async_trait]
-pub trait FileFormat: Send + Sync + Debug {
-    /// Returns the table provider as [`Any`](std::any::Any) so that it can be
-    /// downcast to a specific implementation.
-    fn as_any(&self) -> &dyn Any;
-
-    /// Returns the extension for this FileFormat, e.g. "file.csv" -> csv
-    fn get_ext(&self) -> String;
-
-    /// Returns the extension for this FileFormat when compressed, e.g. 
"file.csv.gz" -> csv
-    fn get_ext_with_compression(
-        &self,
-        _file_compression_type: &FileCompressionType,
-    ) -> Result<String>;
-
-    /// Infer the common schema of the provided objects. The objects will 
usually
-    /// be analysed up to a given number of records or files (as specified in 
the
-    /// format config) then give the estimated common schema. This might fail 
if
-    /// the files have schemas that cannot be merged.
-    async fn infer_schema(
-        &self,
-        state: &dyn Session,
-        store: &Arc<dyn ObjectStore>,
-        objects: &[ObjectMeta],
-    ) -> Result<SchemaRef>;
-
-    /// Infer the statistics for the provided object. The cost and accuracy of 
the
-    /// estimated statistics might vary greatly between file formats.
-    ///
-    /// `table_schema` is the (combined) schema of the overall table
-    /// and may be a superset of the schema contained in this file.
-    ///
-    /// TODO: should the file source return statistics for only columns 
referred to in the table schema?
-    async fn infer_stats(
-        &self,
-        state: &dyn Session,
-        store: &Arc<dyn ObjectStore>,
-        table_schema: SchemaRef,
-        object: &ObjectMeta,
-    ) -> Result<Statistics>;
-
-    /// Take a list of files and convert it to the appropriate executor
-    /// according to this file format.
-    async fn create_physical_plan(
-        &self,
-        state: &dyn Session,
-        conf: FileScanConfig,
-        filters: Option<&Arc<dyn PhysicalExpr>>,
-    ) -> Result<Arc<dyn ExecutionPlan>>;
-
-    /// Take a list of files and the configuration to convert it to the
-    /// appropriate writer executor according to this file format.
-    async fn create_writer_physical_plan(
-        &self,
-        _input: Arc<dyn ExecutionPlan>,
-        _state: &dyn Session,
-        _conf: FileSinkConfig,
-        _order_requirements: Option<LexRequirement>,
-    ) -> Result<Arc<dyn ExecutionPlan>> {
-        not_impl_err!("Writer not implemented for this format")
-    }
-
-    /// Check if the specified file format has support for pushing down the 
provided filters within
-    /// the given schemas. Added initially to support the Parquet file 
format's ability to do this.
-    fn supports_filters_pushdown(
-        &self,
-        _file_schema: &Schema,
-        _table_schema: &Schema,
-        _filters: &[&Expr],
-    ) -> Result<FilePushdownSupport> {
-        Ok(FilePushdownSupport::NoSupport)
-    }
-
-    /// Return the related FileSource such as `CsvSource`, `JsonSource`, etc.
-    fn file_source(&self) -> Arc<dyn FileSource>;
-}
-
-/// An enum to distinguish between different states when determining if 
certain filters can be
-/// pushed down to file scanning
-#[derive(Debug, PartialEq)]
-pub enum FilePushdownSupport {
-    /// The file format/system being asked does not support any sort of 
pushdown. This should be
-    /// used even if the file format theoretically supports some sort of 
pushdown, but it's not
-    /// enabled or implemented yet.
-    NoSupport,
-    /// The file format/system being asked *does* support pushdown, but it 
can't make it work for
-    /// the provided filter/expression
-    NotSupportedForFilter,
-    /// The file format/system being asked *does* support pushdown and *can* 
make it work for the
-    /// provided filter/expression
-    Supported,
-}
+use futures::StreamExt as _;
+use futures::{ready, Stream};
+use std::collections::VecDeque;
+use std::fmt;
+use std::task::Poll;
 
 /// Possible outputs of a [`BatchDeserializer`].
 #[derive(Debug, PartialEq)]
@@ -191,7 +55,7 @@ pub enum DeserializerOutput {
 /// Trait defining a scheme for deserializing byte streams into structured 
data.
 /// Implementors of this trait are responsible for converting raw bytes into
 /// `RecordBatch` objects.
-pub trait BatchDeserializer<T>: Send + Debug {
+pub trait BatchDeserializer<T>: Send + fmt::Debug {
     /// Feeds a message for deserialization, updating the internal state of
     /// this `BatchDeserializer`. Note that one can call this function multiple
     /// times before calling `next`, which will queue multiple messages for
@@ -217,7 +81,7 @@ pub trait BatchDeserializer<T>: Send + Debug {
 /// [`arrow::csv::reader::Decoder`]: ::arrow::csv::reader::Decoder
 /// [`Decoder::decode`]: ::arrow::json::reader::Decoder::decode
 /// [`Decoder::flush`]: ::arrow::json::reader::Decoder::flush
-pub(crate) trait Decoder: Send + Debug {
+pub(crate) trait Decoder: Send + fmt::Debug {
     /// See [`arrow::json::reader::Decoder::decode`].
     ///
     /// [`arrow::json::reader::Decoder::decode`]: 
::arrow::json::reader::Decoder::decode
@@ -232,7 +96,7 @@ pub(crate) trait Decoder: Send + Debug {
     fn can_flush_early(&self) -> bool;
 }
 
-impl<T: Decoder> Debug for DecoderDeserializer<T> {
+impl<T: Decoder> fmt::Debug for DecoderDeserializer<T> {
     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
         f.debug_struct("Deserializer")
             .field("buffered_queue", &self.buffered_queue)
@@ -302,7 +166,7 @@ pub(crate) struct DecoderDeserializer<T: Decoder> {
 
 impl<T: Decoder> DecoderDeserializer<T> {
     /// Creates a new `DecoderDeserializer` with the provided decoder.
-    pub(crate) fn new(decoder: T) -> Self {
+    pub fn new(decoder: T) -> Self {
         DecoderDeserializer {
             decoder,
             buffered_queue: VecDeque::new(),
@@ -336,237 +200,29 @@ pub(crate) fn deserialize_stream<'a>(
     .boxed()
 }
 
-/// A container of [FileFormatFactory] which also implements [FileType].
-/// This enables converting a dyn FileFormat to a dyn FileType.
-/// The former trait is a superset of the latter trait, which includes 
execution time
-/// relevant methods. [FileType] is only used in logical planning and only 
implements
-/// the subset of methods required during logical planning.
-#[derive(Debug)]
-pub struct DefaultFileType {
-    file_format_factory: Arc<dyn FileFormatFactory>,
-}
-
-impl DefaultFileType {
-    /// Constructs a [DefaultFileType] wrapper from a [FileFormatFactory]
-    pub fn new(file_format_factory: Arc<dyn FileFormatFactory>) -> Self {
-        Self {
-            file_format_factory,
-        }
-    }
-
-    /// get a reference to the inner [FileFormatFactory] struct
-    pub fn as_format_factory(&self) -> &Arc<dyn FileFormatFactory> {
-        &self.file_format_factory
-    }
-}
-
-impl FileType for DefaultFileType {
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-}
-
-impl Display for DefaultFileType {
-    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-        write!(f, "{:?}", self.file_format_factory)
-    }
-}
-
-impl GetExt for DefaultFileType {
-    fn get_ext(&self) -> String {
-        self.file_format_factory.get_ext()
-    }
-}
-
-/// Converts a [FileFormatFactory] to a [FileType]
-pub fn format_as_file_type(
-    file_format_factory: Arc<dyn FileFormatFactory>,
-) -> Arc<dyn FileType> {
-    Arc::new(DefaultFileType {
-        file_format_factory,
-    })
-}
-
-/// Converts a [FileType] to a [FileFormatFactory].
-/// Returns an error if the [FileType] cannot be
-/// downcasted to a [DefaultFileType].
-pub fn file_type_to_format(
-    file_type: &Arc<dyn FileType>,
-) -> Result<Arc<dyn FileFormatFactory>> {
-    match file_type
-        .as_ref()
-        .as_any()
-        .downcast_ref::<DefaultFileType>()
-    {
-        Some(source) => Ok(Arc::clone(&source.file_format_factory)),
-        _ => internal_err!("FileType was not DefaultFileType"),
-    }
-}
-
-/// Create a new field with the specified data type, copying the other
-/// properties from the input field
-fn field_with_new_type(field: &FieldRef, new_type: DataType) -> FieldRef {
-    Arc::new(field.as_ref().clone().with_data_type(new_type))
-}
-
-/// Transform a schema to use view types for Utf8 and Binary
-///
-/// See [parquet::ParquetFormat::force_view_types] for details
-pub fn transform_schema_to_view(schema: &Schema) -> Schema {
-    let transformed_fields: Vec<Arc<Field>> = schema
-        .fields
-        .iter()
-        .map(|field| match field.data_type() {
-            DataType::Utf8 | DataType::LargeUtf8 => {
-                field_with_new_type(field, DataType::Utf8View)
-            }
-            DataType::Binary | DataType::LargeBinary => {
-                field_with_new_type(field, DataType::BinaryView)
-            }
-            _ => Arc::clone(field),
-        })
-        .collect();
-    Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
-}
-
-/// Coerces the file schema if the table schema uses a view type.
-#[cfg(not(target_arch = "wasm32"))]
-pub(crate) fn coerce_file_schema_to_view_type(
-    table_schema: &Schema,
-    file_schema: &Schema,
-) -> Option<Schema> {
-    let mut transform = false;
-    let table_fields: HashMap<_, _> = table_schema
-        .fields
-        .iter()
-        .map(|f| {
-            let dt = f.data_type();
-            if dt.equals_datatype(&DataType::Utf8View)
-                || dt.equals_datatype(&DataType::BinaryView)
-            {
-                transform = true;
-            }
-            (f.name(), dt)
-        })
-        .collect();
-
-    if !transform {
-        return None;
-    }
-
-    let transformed_fields: Vec<Arc<Field>> = file_schema
-        .fields
-        .iter()
-        .map(
-            |field| match (table_fields.get(field.name()), field.data_type()) {
-                (Some(DataType::Utf8View), DataType::Utf8 | 
DataType::LargeUtf8) => {
-                    field_with_new_type(field, DataType::Utf8View)
-                }
-                (
-                    Some(DataType::BinaryView),
-                    DataType::Binary | DataType::LargeBinary,
-                ) => field_with_new_type(field, DataType::BinaryView),
-                _ => Arc::clone(field),
-            },
-        )
-        .collect();
-
-    Some(Schema::new_with_metadata(
-        transformed_fields,
-        file_schema.metadata.clone(),
-    ))
-}
-
-/// Transform a schema so that any binary types are strings
-pub fn transform_binary_to_string(schema: &Schema) -> Schema {
-    let transformed_fields: Vec<Arc<Field>> = schema
-        .fields
-        .iter()
-        .map(|field| match field.data_type() {
-            DataType::Binary => field_with_new_type(field, DataType::Utf8),
-            DataType::LargeBinary => field_with_new_type(field, 
DataType::LargeUtf8),
-            DataType::BinaryView => field_with_new_type(field, 
DataType::Utf8View),
-            _ => Arc::clone(field),
-        })
-        .collect();
-    Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
-}
-
-/// If the table schema uses a string type, coerce the file schema to use a 
string type.
-///
-/// See [parquet::ParquetFormat::binary_as_string] for details
-#[cfg(not(target_arch = "wasm32"))]
-pub(crate) fn coerce_file_schema_to_string_type(
-    table_schema: &Schema,
-    file_schema: &Schema,
-) -> Option<Schema> {
-    let mut transform = false;
-    let table_fields: HashMap<_, _> = table_schema
-        .fields
-        .iter()
-        .map(|f| (f.name(), f.data_type()))
-        .collect();
-    let transformed_fields: Vec<Arc<Field>> = file_schema
-        .fields
-        .iter()
-        .map(
-            |field| match (table_fields.get(field.name()), field.data_type()) {
-                // table schema uses string type, coerce the file schema to 
use string type
-                (
-                    Some(DataType::Utf8),
-                    DataType::Binary | DataType::LargeBinary | 
DataType::BinaryView,
-                ) => {
-                    transform = true;
-                    field_with_new_type(field, DataType::Utf8)
-                }
-                // table schema uses large string type, coerce the file schema 
to use large string type
-                (
-                    Some(DataType::LargeUtf8),
-                    DataType::Binary | DataType::LargeBinary | 
DataType::BinaryView,
-                ) => {
-                    transform = true;
-                    field_with_new_type(field, DataType::LargeUtf8)
-                }
-                // table schema uses string view type, coerce the file schema 
to use view type
-                (
-                    Some(DataType::Utf8View),
-                    DataType::Binary | DataType::LargeBinary | 
DataType::BinaryView,
-                ) => {
-                    transform = true;
-                    field_with_new_type(field, DataType::Utf8View)
-                }
-                _ => Arc::clone(field),
-            },
-        )
-        .collect();
-
-    if !transform {
-        None
-    } else {
-        Some(Schema::new_with_metadata(
-            transformed_fields,
-            file_schema.metadata.clone(),
-        ))
-    }
-}
-
 #[cfg(test)]
 pub(crate) mod test_util {
+    use std::fmt::{self, Display};
     use std::ops::Range;
-    use std::sync::Mutex;
+    use std::sync::{Arc, Mutex};
 
-    use super::*;
     use crate::datasource::listing::PartitionedFile;
     use crate::datasource::object_store::ObjectStoreUrl;
     use crate::test::object_store::local_unpartitioned_file;
+    use async_trait::async_trait;
     use bytes::Bytes;
+    use datafusion_catalog::Session;
+    use datafusion_common::Result;
+    use datafusion_datasource::file_format::FileFormat;
+    use datafusion_datasource::file_scan_config::FileScanConfig;
+    use datafusion_physical_plan::ExecutionPlan;
     use futures::stream::BoxStream;
     use futures::StreamExt;
     use object_store::local::LocalFileSystem;
     use object_store::path::Path;
     use object_store::{
         Attributes, GetOptions, GetResult, GetResultPayload, ListResult, 
MultipartUpload,
-        PutMultipartOpts, PutOptions, PutPayload, PutResult,
+        ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload, 
PutResult,
     };
 
     pub async fn scan_format(
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs 
b/datafusion/core/src/datasource/file_format/parquet.rs
index e9ecff7baf..98aa24ad00 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -25,11 +25,7 @@ use std::sync::Arc;
 
 use super::write::demux::DemuxedStreamReceiver;
 use super::write::{create_writer, SharedBuffer};
-use super::{
-    coerce_file_schema_to_string_type, coerce_file_schema_to_view_type,
-    transform_binary_to_string, transform_schema_to_view, FileFormat, 
FileFormatFactory,
-    FilePushdownSupport,
-};
+use super::{FileFormat, FileFormatFactory, FilePushdownSupport};
 use crate::arrow::array::RecordBatch;
 use crate::arrow::datatypes::{Fields, Schema, SchemaRef};
 use crate::datasource::file_format::file_compression_type::FileCompressionType;
@@ -47,6 +43,7 @@ use crate::physical_plan::{
 };
 
 use arrow::compute::sum;
+use arrow_schema::{DataType, Field, FieldRef};
 use datafusion_catalog::Session;
 use datafusion_common::config::{ConfigField, ConfigFileType, 
TableParquetOptions};
 use datafusion_common::parsers::CompressionTypeVariant;
@@ -471,6 +468,153 @@ impl FileFormat for ParquetFormat {
     }
 }
 
+/// Coerces the file schema if the table schema uses a view type.
+#[cfg(not(target_arch = "wasm32"))]
+pub fn coerce_file_schema_to_view_type(
+    table_schema: &Schema,
+    file_schema: &Schema,
+) -> Option<Schema> {
+    let mut transform = false;
+    let table_fields: HashMap<_, _> = table_schema
+        .fields
+        .iter()
+        .map(|f| {
+            let dt = f.data_type();
+            if dt.equals_datatype(&DataType::Utf8View)
+                || dt.equals_datatype(&DataType::BinaryView)
+            {
+                transform = true;
+            }
+            (f.name(), dt)
+        })
+        .collect();
+
+    if !transform {
+        return None;
+    }
+
+    let transformed_fields: Vec<Arc<Field>> = file_schema
+        .fields
+        .iter()
+        .map(
+            |field| match (table_fields.get(field.name()), field.data_type()) {
+                (Some(DataType::Utf8View), DataType::Utf8 | 
DataType::LargeUtf8) => {
+                    field_with_new_type(field, DataType::Utf8View)
+                }
+                (
+                    Some(DataType::BinaryView),
+                    DataType::Binary | DataType::LargeBinary,
+                ) => field_with_new_type(field, DataType::BinaryView),
+                _ => Arc::clone(field),
+            },
+        )
+        .collect();
+
+    Some(Schema::new_with_metadata(
+        transformed_fields,
+        file_schema.metadata.clone(),
+    ))
+}
+
+/// If the table schema uses a string type, coerce the file schema to use a 
string type.
+///
+/// See [ParquetFormat::binary_as_string] for details
+#[cfg(not(target_arch = "wasm32"))]
+pub fn coerce_file_schema_to_string_type(
+    table_schema: &Schema,
+    file_schema: &Schema,
+) -> Option<Schema> {
+    let mut transform = false;
+    let table_fields: HashMap<_, _> = table_schema
+        .fields
+        .iter()
+        .map(|f| (f.name(), f.data_type()))
+        .collect();
+    let transformed_fields: Vec<Arc<Field>> = file_schema
+        .fields
+        .iter()
+        .map(
+            |field| match (table_fields.get(field.name()), field.data_type()) {
+                // table schema uses string type, coerce the file schema to 
use string type
+                (
+                    Some(DataType::Utf8),
+                    DataType::Binary | DataType::LargeBinary | 
DataType::BinaryView,
+                ) => {
+                    transform = true;
+                    field_with_new_type(field, DataType::Utf8)
+                }
+                // table schema uses large string type, coerce the file schema 
to use large string type
+                (
+                    Some(DataType::LargeUtf8),
+                    DataType::Binary | DataType::LargeBinary | 
DataType::BinaryView,
+                ) => {
+                    transform = true;
+                    field_with_new_type(field, DataType::LargeUtf8)
+                }
+                // table schema uses string view type, coerce the file schema 
to use view type
+                (
+                    Some(DataType::Utf8View),
+                    DataType::Binary | DataType::LargeBinary | 
DataType::BinaryView,
+                ) => {
+                    transform = true;
+                    field_with_new_type(field, DataType::Utf8View)
+                }
+                _ => Arc::clone(field),
+            },
+        )
+        .collect();
+
+    if !transform {
+        None
+    } else {
+        Some(Schema::new_with_metadata(
+            transformed_fields,
+            file_schema.metadata.clone(),
+        ))
+    }
+}
+
+/// Create a new field with the specified data type, copying the other
+/// properties from the input field
+fn field_with_new_type(field: &FieldRef, new_type: DataType) -> FieldRef {
+    Arc::new(field.as_ref().clone().with_data_type(new_type))
+}
+
+/// Transform a schema to use view types for Utf8 and Binary
+///
+/// See [ParquetFormat::force_view_types] for details
+pub fn transform_schema_to_view(schema: &Schema) -> Schema {
+    let transformed_fields: Vec<Arc<Field>> = schema
+        .fields
+        .iter()
+        .map(|field| match field.data_type() {
+            DataType::Utf8 | DataType::LargeUtf8 => {
+                field_with_new_type(field, DataType::Utf8View)
+            }
+            DataType::Binary | DataType::LargeBinary => {
+                field_with_new_type(field, DataType::BinaryView)
+            }
+            _ => Arc::clone(field),
+        })
+        .collect();
+    Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
+}
+
+/// Transform a schema so that any binary types are strings
+pub fn transform_binary_to_string(schema: &Schema) -> Schema {
+    let transformed_fields: Vec<Arc<Field>> = schema
+        .fields
+        .iter()
+        .map(|field| match field.data_type() {
+            DataType::Binary => field_with_new_type(field, DataType::Utf8),
+            DataType::LargeBinary => field_with_new_type(field, 
DataType::LargeUtf8),
+            DataType::BinaryView => field_with_new_type(field, 
DataType::Utf8View),
+            _ => Arc::clone(field),
+        })
+        .collect();
+    Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
+}
+
 /// [`MetadataFetch`] adapter for reading bytes from an [`ObjectStore`]
 struct ObjectStoreFetch<'a> {
     store: &'a dyn ObjectStore,
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
index 138b448979..4230a1bdce 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
@@ -19,7 +19,7 @@
 
 use std::sync::Arc;
 
-use crate::datasource::file_format::{
+use crate::datasource::file_format::parquet::{
     coerce_file_schema_to_string_type, coerce_file_schema_to_view_type,
 };
 use 
crate::datasource::physical_plan::parquet::page_filter::PagePruningAccessPlanFilter;
diff --git a/datafusion/datasource/src/file_format.rs 
b/datafusion/datasource/src/file_format.rs
new file mode 100644
index 0000000000..aa0338fab7
--- /dev/null
+++ b/datafusion/datasource/src/file_format.rs
@@ -0,0 +1,227 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Module containing helper methods for the various file formats
+//! See write.rs for write related helper methods
+
+use std::any::Any;
+use std::collections::HashMap;
+use std::fmt;
+use std::sync::Arc;
+
+use crate::file::FileSource;
+use crate::file_compression_type::FileCompressionType;
+use crate::file_scan_config::FileScanConfig;
+use crate::file_sink_config::FileSinkConfig;
+
+use arrow::datatypes::{Schema, SchemaRef};
+use async_trait::async_trait;
+use datafusion_catalog::Session;
+use datafusion_common::file_options::file_type::FileType;
+use datafusion_common::{internal_err, not_impl_err, GetExt, Result, 
Statistics};
+use datafusion_expr::Expr;
+use datafusion_physical_expr::{LexRequirement, PhysicalExpr};
+use datafusion_physical_plan::ExecutionPlan;
+
+use object_store::{ObjectMeta, ObjectStore};
+
+/// Default max records to scan to infer the schema
+pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000;
+
+/// This trait abstracts all the file format specific implementations
+/// from the [`TableProvider`]. This helps code re-utilization across
+/// providers that support the same file formats.
+///
+/// [`TableProvider`]: datafusion_catalog::TableProvider
+#[async_trait]
+pub trait FileFormat: Send + Sync + fmt::Debug {
+    /// Returns the table provider as [`Any`](std::any::Any) so that it can be
+    /// downcast to a specific implementation.
+    fn as_any(&self) -> &dyn Any;
+
+    /// Returns the extension for this FileFormat, e.g. "file.csv" -> csv
+    fn get_ext(&self) -> String;
+
+    /// Returns the extension for this FileFormat when compressed, e.g. 
"file.csv.gz" -> csv
+    fn get_ext_with_compression(
+        &self,
+        _file_compression_type: &FileCompressionType,
+    ) -> Result<String>;
+
+    /// Infer the common schema of the provided objects. The objects will 
usually
+    /// be analysed up to a given number of records or files (as specified in 
the
+    /// format config) then give the estimated common schema. This might fail 
if
+    /// the files have schemas that cannot be merged.
+    async fn infer_schema(
+        &self,
+        state: &dyn Session,
+        store: &Arc<dyn ObjectStore>,
+        objects: &[ObjectMeta],
+    ) -> Result<SchemaRef>;
+
+    /// Infer the statistics for the provided object. The cost and accuracy of 
the
+    /// estimated statistics might vary greatly between file formats.
+    ///
+    /// `table_schema` is the (combined) schema of the overall table
+    /// and may be a superset of the schema contained in this file.
+    ///
+    /// TODO: should the file source return statistics for only columns 
referred to in the table schema?
+    async fn infer_stats(
+        &self,
+        state: &dyn Session,
+        store: &Arc<dyn ObjectStore>,
+        table_schema: SchemaRef,
+        object: &ObjectMeta,
+    ) -> Result<Statistics>;
+
+    /// Take a list of files and convert it to the appropriate executor
+    /// according to this file format.
+    async fn create_physical_plan(
+        &self,
+        state: &dyn Session,
+        conf: FileScanConfig,
+        filters: Option<&Arc<dyn PhysicalExpr>>,
+    ) -> Result<Arc<dyn ExecutionPlan>>;
+
+    /// Take a list of files and the configuration to convert it to the
+    /// appropriate writer executor according to this file format.
+    async fn create_writer_physical_plan(
+        &self,
+        _input: Arc<dyn ExecutionPlan>,
+        _state: &dyn Session,
+        _conf: FileSinkConfig,
+        _order_requirements: Option<LexRequirement>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        not_impl_err!("Writer not implemented for this format")
+    }
+
+    /// Check if the specified file format has support for pushing down the 
provided filters within
+    /// the given schemas. Added initially to support the Parquet file 
format's ability to do this.
+    fn supports_filters_pushdown(
+        &self,
+        _file_schema: &Schema,
+        _table_schema: &Schema,
+        _filters: &[&Expr],
+    ) -> Result<FilePushdownSupport> {
+        Ok(FilePushdownSupport::NoSupport)
+    }
+
+    /// Return the related FileSource such as `CsvSource`, `JsonSource`, etc.
+    fn file_source(&self) -> Arc<dyn FileSource>;
+}
+
+/// An enum to distinguish between different states when determining if 
certain filters can be
+/// pushed down to file scanning
+#[derive(Debug, PartialEq)]
+pub enum FilePushdownSupport {
+    /// The file format/system being asked does not support any sort of 
pushdown. This should be
+    /// used even if the file format theoretically supports some sort of 
pushdown, but it's not
+    /// enabled or implemented yet.
+    NoSupport,
+    /// The file format/system being asked *does* support pushdown, but it 
can't make it work for
+    /// the provided filter/expression
+    NotSupportedForFilter,
+    /// The file format/system being asked *does* support pushdown and *can* 
make it work for the
+    /// provided filter/expression
+    Supported,
+}
+
+/// Factory for creating [`FileFormat`] instances based on session and command 
level options
+///
+/// Users can provide their own `FileFormatFactory` to support arbitrary file 
formats
+pub trait FileFormatFactory: Sync + Send + GetExt + fmt::Debug {
+    /// Initialize a [FileFormat] and configure based on session and command 
level options
+    fn create(
+        &self,
+        state: &dyn Session,
+        format_options: &HashMap<String, String>,
+    ) -> Result<Arc<dyn FileFormat>>;
+
+    /// Initialize a [FileFormat] with all options set to default values
+    fn default(&self) -> Arc<dyn FileFormat>;
+
+    /// Returns the table source as [`Any`] so that it can be
+    /// downcast to a specific implementation.
+    fn as_any(&self) -> &dyn Any;
+}
+
+/// A container of [FileFormatFactory] which also implements [FileType].
+/// This enables converting a dyn FileFormat to a dyn FileType.
+/// The former trait is a superset of the latter trait, which includes 
execution time
+/// relevant methods. [FileType] is only used in logical planning and only 
implements
+/// the subset of methods required during logical planning.
+#[derive(Debug)]
+pub struct DefaultFileType {
+    file_format_factory: Arc<dyn FileFormatFactory>,
+}
+
+impl DefaultFileType {
+    /// Constructs a [DefaultFileType] wrapper from a [FileFormatFactory]
+    pub fn new(file_format_factory: Arc<dyn FileFormatFactory>) -> Self {
+        Self {
+            file_format_factory,
+        }
+    }
+
+    /// get a reference to the inner [FileFormatFactory] struct
+    pub fn as_format_factory(&self) -> &Arc<dyn FileFormatFactory> {
+        &self.file_format_factory
+    }
+}
+
+impl FileType for DefaultFileType {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+}
+
+impl fmt::Display for DefaultFileType {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "{:?}", self.file_format_factory)
+    }
+}
+
+impl GetExt for DefaultFileType {
+    fn get_ext(&self) -> String {
+        self.file_format_factory.get_ext()
+    }
+}
+
+/// Converts a [FileFormatFactory] to a [FileType]
+pub fn format_as_file_type(
+    file_format_factory: Arc<dyn FileFormatFactory>,
+) -> Arc<dyn FileType> {
+    Arc::new(DefaultFileType {
+        file_format_factory,
+    })
+}
+
+/// Converts a [FileType] to a [FileFormatFactory].
+/// Returns an error if the [FileType] cannot be
+/// downcasted to a [DefaultFileType].
+pub fn file_type_to_format(
+    file_type: &Arc<dyn FileType>,
+) -> Result<Arc<dyn FileFormatFactory>> {
+    match file_type
+        .as_ref()
+        .as_any()
+        .downcast_ref::<DefaultFileType>()
+    {
+        Some(source) => Ok(Arc::clone(&source.file_format_factory)),
+        _ => internal_err!("FileType was not DefaultFileType"),
+    }
+}
diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs
index e60b02f9c9..0ed5975847 100644
--- a/datafusion/datasource/src/mod.rs
+++ b/datafusion/datasource/src/mod.rs
@@ -27,6 +27,7 @@
 pub mod display;
 pub mod file;
 pub mod file_compression_type;
+pub mod file_format;
 pub mod file_groups;
 pub mod file_meta;
 pub mod file_scan_config;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to