This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 212f4245c9 Move `FileFormat` and related pieces to
`datafusion-datasource` (#14873)
212f4245c9 is described below
commit 212f4245c98fb60929bd544545432ae9235e5b35
Author: Adam Gutglick <[email protected]>
AuthorDate: Wed Feb 26 11:56:24 2025 +0000
Move `FileFormat` and related pieces to `datafusion-datasource` (#14873)
* initial work
* Trigger Build
* restore pub crate
* move things into parquet
* Fix docs references
---
datafusion/core/src/datasource/file_format/json.rs | 3 +-
datafusion/core/src/datasource/file_format/mod.rs | 394 ++-------------------
.../core/src/datasource/file_format/parquet.rs | 154 +++++++-
.../src/datasource/physical_plan/parquet/opener.rs | 2 +-
datafusion/datasource/src/file_format.rs | 227 ++++++++++++
datafusion/datasource/src/mod.rs | 1 +
6 files changed, 405 insertions(+), 376 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/json.rs
b/datafusion/core/src/datasource/file_format/json.rs
index 7a5aaf7c64..1a2aaf3af8 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -26,7 +26,7 @@ use std::sync::Arc;
use super::write::orchestration::spawn_writer_tasks_and_join;
use super::{
- Decoder, DecoderDeserializer, FileFormat, FileFormatFactory,
FileScanConfig,
+ Decoder, DecoderDeserializer, FileFormat, FileFormatFactory,
DEFAULT_SCHEMA_INFER_MAX_RECORD,
};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
@@ -52,6 +52,7 @@ use datafusion_common::{not_impl_err, GetExt,
DEFAULT_JSON_EXTENSION};
use datafusion_common_runtime::SpawnedTask;
use datafusion_datasource::display::FileGroupDisplay;
use datafusion_datasource::file::FileSource;
+use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_execution::TaskContext;
use datafusion_expr::dml::InsertOp;
use datafusion_physical_expr::PhysicalExpr;
diff --git a/datafusion/core/src/datasource/file_format/mod.rs
b/datafusion/core/src/datasource/file_format/mod.rs
index 2b46748d0a..5dbf4957a4 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -18,9 +18,6 @@
//! Module containing helper methods for the various file formats
//! See write.rs for write related helper methods
-/// Default max records to scan to infer the schema
-pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000;
-
pub mod arrow;
pub mod avro;
pub mod csv;
@@ -28,154 +25,21 @@ pub mod json;
pub mod options;
#[cfg(feature = "parquet")]
pub mod parquet;
-use datafusion_datasource::file::FileSource;
+
+use ::arrow::array::RecordBatch;
+use arrow_schema::ArrowError;
+use bytes::Buf;
+use bytes::Bytes;
+use datafusion_common::Result;
pub use datafusion_datasource::file_compression_type;
-use datafusion_datasource::file_scan_config::FileScanConfig;
+pub use datafusion_datasource::file_format::*;
pub use datafusion_datasource::write;
-
-use std::any::Any;
-use std::collections::{HashMap, VecDeque};
-use std::fmt::{self, Debug, Display};
-use std::sync::Arc;
-use std::task::Poll;
-
-use crate::arrow::array::RecordBatch;
-use crate::arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
-use crate::arrow::error::ArrowError;
-use crate::datasource::physical_plan::FileSinkConfig;
-use crate::error::Result;
-use crate::physical_plan::{ExecutionPlan, Statistics};
-
-use datafusion_catalog::Session;
-use datafusion_common::file_options::file_type::FileType;
-use datafusion_common::{internal_err, not_impl_err, GetExt};
-use datafusion_expr::Expr;
-use datafusion_physical_expr::PhysicalExpr;
-
-use async_trait::async_trait;
-use bytes::{Buf, Bytes};
-use datafusion_physical_expr_common::sort_expr::LexRequirement;
-use file_compression_type::FileCompressionType;
use futures::stream::BoxStream;
-use futures::{ready, Stream, StreamExt};
-use object_store::{ObjectMeta, ObjectStore};
-
-/// Factory for creating [`FileFormat`] instances based on session and command
level options
-///
-/// Users can provide their own `FileFormatFactory` to support arbitrary file
formats
-pub trait FileFormatFactory: Sync + Send + GetExt + Debug {
- /// Initialize a [FileFormat] and configure based on session and command
level options
- fn create(
- &self,
- state: &dyn Session,
- format_options: &HashMap<String, String>,
- ) -> Result<Arc<dyn FileFormat>>;
-
- /// Initialize a [FileFormat] with all options set to default values
- fn default(&self) -> Arc<dyn FileFormat>;
-
- /// Returns the table source as [`Any`] so that it can be
- /// downcast to a specific implementation.
- fn as_any(&self) -> &dyn Any;
-}
-
-/// This trait abstracts all the file format specific implementations
-/// from the [`TableProvider`]. This helps code re-utilization across
-/// providers that support the same file formats.
-///
-/// [`TableProvider`]: crate::catalog::TableProvider
-#[async_trait]
-pub trait FileFormat: Send + Sync + Debug {
- /// Returns the table provider as [`Any`](std::any::Any) so that it can be
- /// downcast to a specific implementation.
- fn as_any(&self) -> &dyn Any;
-
- /// Returns the extension for this FileFormat, e.g. "file.csv" -> csv
- fn get_ext(&self) -> String;
-
- /// Returns the extension for this FileFormat when compressed, e.g.
"file.csv.gz" -> csv
- fn get_ext_with_compression(
- &self,
- _file_compression_type: &FileCompressionType,
- ) -> Result<String>;
-
- /// Infer the common schema of the provided objects. The objects will
usually
- /// be analysed up to a given number of records or files (as specified in
the
- /// format config) then give the estimated common schema. This might fail
if
- /// the files have schemas that cannot be merged.
- async fn infer_schema(
- &self,
- state: &dyn Session,
- store: &Arc<dyn ObjectStore>,
- objects: &[ObjectMeta],
- ) -> Result<SchemaRef>;
-
- /// Infer the statistics for the provided object. The cost and accuracy of
the
- /// estimated statistics might vary greatly between file formats.
- ///
- /// `table_schema` is the (combined) schema of the overall table
- /// and may be a superset of the schema contained in this file.
- ///
- /// TODO: should the file source return statistics for only columns
referred to in the table schema?
- async fn infer_stats(
- &self,
- state: &dyn Session,
- store: &Arc<dyn ObjectStore>,
- table_schema: SchemaRef,
- object: &ObjectMeta,
- ) -> Result<Statistics>;
-
- /// Take a list of files and convert it to the appropriate executor
- /// according to this file format.
- async fn create_physical_plan(
- &self,
- state: &dyn Session,
- conf: FileScanConfig,
- filters: Option<&Arc<dyn PhysicalExpr>>,
- ) -> Result<Arc<dyn ExecutionPlan>>;
-
- /// Take a list of files and the configuration to convert it to the
- /// appropriate writer executor according to this file format.
- async fn create_writer_physical_plan(
- &self,
- _input: Arc<dyn ExecutionPlan>,
- _state: &dyn Session,
- _conf: FileSinkConfig,
- _order_requirements: Option<LexRequirement>,
- ) -> Result<Arc<dyn ExecutionPlan>> {
- not_impl_err!("Writer not implemented for this format")
- }
-
- /// Check if the specified file format has support for pushing down the
provided filters within
- /// the given schemas. Added initially to support the Parquet file
format's ability to do this.
- fn supports_filters_pushdown(
- &self,
- _file_schema: &Schema,
- _table_schema: &Schema,
- _filters: &[&Expr],
- ) -> Result<FilePushdownSupport> {
- Ok(FilePushdownSupport::NoSupport)
- }
-
- /// Return the related FileSource such as `CsvSource`, `JsonSource`, etc.
- fn file_source(&self) -> Arc<dyn FileSource>;
-}
-
-/// An enum to distinguish between different states when determining if
certain filters can be
-/// pushed down to file scanning
-#[derive(Debug, PartialEq)]
-pub enum FilePushdownSupport {
- /// The file format/system being asked does not support any sort of
pushdown. This should be
- /// used even if the file format theoretically supports some sort of
pushdown, but it's not
- /// enabled or implemented yet.
- NoSupport,
- /// The file format/system being asked *does* support pushdown, but it
can't make it work for
- /// the provided filter/expression
- NotSupportedForFilter,
- /// The file format/system being asked *does* support pushdown and *can*
make it work for the
- /// provided filter/expression
- Supported,
-}
+use futures::StreamExt as _;
+use futures::{ready, Stream};
+use std::collections::VecDeque;
+use std::fmt;
+use std::task::Poll;
/// Possible outputs of a [`BatchDeserializer`].
#[derive(Debug, PartialEq)]
@@ -191,7 +55,7 @@ pub enum DeserializerOutput {
/// Trait defining a scheme for deserializing byte streams into structured
data.
/// Implementors of this trait are responsible for converting raw bytes into
/// `RecordBatch` objects.
-pub trait BatchDeserializer<T>: Send + Debug {
+pub trait BatchDeserializer<T>: Send + fmt::Debug {
/// Feeds a message for deserialization, updating the internal state of
/// this `BatchDeserializer`. Note that one can call this function multiple
/// times before calling `next`, which will queue multiple messages for
@@ -217,7 +81,7 @@ pub trait BatchDeserializer<T>: Send + Debug {
/// [`arrow::csv::reader::Decoder`]: ::arrow::csv::reader::Decoder
/// [`Decoder::decode`]: ::arrow::json::reader::Decoder::decode
/// [`Decoder::flush`]: ::arrow::json::reader::Decoder::flush
-pub(crate) trait Decoder: Send + Debug {
+pub(crate) trait Decoder: Send + fmt::Debug {
/// See [`arrow::json::reader::Decoder::decode`].
///
/// [`arrow::json::reader::Decoder::decode`]:
::arrow::json::reader::Decoder::decode
@@ -232,7 +96,7 @@ pub(crate) trait Decoder: Send + Debug {
fn can_flush_early(&self) -> bool;
}
-impl<T: Decoder> Debug for DecoderDeserializer<T> {
+impl<T: Decoder> fmt::Debug for DecoderDeserializer<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Deserializer")
.field("buffered_queue", &self.buffered_queue)
@@ -302,7 +166,7 @@ pub(crate) struct DecoderDeserializer<T: Decoder> {
impl<T: Decoder> DecoderDeserializer<T> {
/// Creates a new `DecoderDeserializer` with the provided decoder.
- pub(crate) fn new(decoder: T) -> Self {
+ pub fn new(decoder: T) -> Self {
DecoderDeserializer {
decoder,
buffered_queue: VecDeque::new(),
@@ -336,237 +200,29 @@ pub(crate) fn deserialize_stream<'a>(
.boxed()
}
-/// A container of [FileFormatFactory] which also implements [FileType].
-/// This enables converting a dyn FileFormat to a dyn FileType.
-/// The former trait is a superset of the latter trait, which includes
execution time
-/// relevant methods. [FileType] is only used in logical planning and only
implements
-/// the subset of methods required during logical planning.
-#[derive(Debug)]
-pub struct DefaultFileType {
- file_format_factory: Arc<dyn FileFormatFactory>,
-}
-
-impl DefaultFileType {
- /// Constructs a [DefaultFileType] wrapper from a [FileFormatFactory]
- pub fn new(file_format_factory: Arc<dyn FileFormatFactory>) -> Self {
- Self {
- file_format_factory,
- }
- }
-
- /// get a reference to the inner [FileFormatFactory] struct
- pub fn as_format_factory(&self) -> &Arc<dyn FileFormatFactory> {
- &self.file_format_factory
- }
-}
-
-impl FileType for DefaultFileType {
- fn as_any(&self) -> &dyn Any {
- self
- }
-}
-
-impl Display for DefaultFileType {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(f, "{:?}", self.file_format_factory)
- }
-}
-
-impl GetExt for DefaultFileType {
- fn get_ext(&self) -> String {
- self.file_format_factory.get_ext()
- }
-}
-
-/// Converts a [FileFormatFactory] to a [FileType]
-pub fn format_as_file_type(
- file_format_factory: Arc<dyn FileFormatFactory>,
-) -> Arc<dyn FileType> {
- Arc::new(DefaultFileType {
- file_format_factory,
- })
-}
-
-/// Converts a [FileType] to a [FileFormatFactory].
-/// Returns an error if the [FileType] cannot be
-/// downcasted to a [DefaultFileType].
-pub fn file_type_to_format(
- file_type: &Arc<dyn FileType>,
-) -> Result<Arc<dyn FileFormatFactory>> {
- match file_type
- .as_ref()
- .as_any()
- .downcast_ref::<DefaultFileType>()
- {
- Some(source) => Ok(Arc::clone(&source.file_format_factory)),
- _ => internal_err!("FileType was not DefaultFileType"),
- }
-}
-
-/// Create a new field with the specified data type, copying the other
-/// properties from the input field
-fn field_with_new_type(field: &FieldRef, new_type: DataType) -> FieldRef {
- Arc::new(field.as_ref().clone().with_data_type(new_type))
-}
-
-/// Transform a schema to use view types for Utf8 and Binary
-///
-/// See [parquet::ParquetFormat::force_view_types] for details
-pub fn transform_schema_to_view(schema: &Schema) -> Schema {
- let transformed_fields: Vec<Arc<Field>> = schema
- .fields
- .iter()
- .map(|field| match field.data_type() {
- DataType::Utf8 | DataType::LargeUtf8 => {
- field_with_new_type(field, DataType::Utf8View)
- }
- DataType::Binary | DataType::LargeBinary => {
- field_with_new_type(field, DataType::BinaryView)
- }
- _ => Arc::clone(field),
- })
- .collect();
- Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
-}
-
-/// Coerces the file schema if the table schema uses a view type.
-#[cfg(not(target_arch = "wasm32"))]
-pub(crate) fn coerce_file_schema_to_view_type(
- table_schema: &Schema,
- file_schema: &Schema,
-) -> Option<Schema> {
- let mut transform = false;
- let table_fields: HashMap<_, _> = table_schema
- .fields
- .iter()
- .map(|f| {
- let dt = f.data_type();
- if dt.equals_datatype(&DataType::Utf8View)
- || dt.equals_datatype(&DataType::BinaryView)
- {
- transform = true;
- }
- (f.name(), dt)
- })
- .collect();
-
- if !transform {
- return None;
- }
-
- let transformed_fields: Vec<Arc<Field>> = file_schema
- .fields
- .iter()
- .map(
- |field| match (table_fields.get(field.name()), field.data_type()) {
- (Some(DataType::Utf8View), DataType::Utf8 |
DataType::LargeUtf8) => {
- field_with_new_type(field, DataType::Utf8View)
- }
- (
- Some(DataType::BinaryView),
- DataType::Binary | DataType::LargeBinary,
- ) => field_with_new_type(field, DataType::BinaryView),
- _ => Arc::clone(field),
- },
- )
- .collect();
-
- Some(Schema::new_with_metadata(
- transformed_fields,
- file_schema.metadata.clone(),
- ))
-}
-
-/// Transform a schema so that any binary types are strings
-pub fn transform_binary_to_string(schema: &Schema) -> Schema {
- let transformed_fields: Vec<Arc<Field>> = schema
- .fields
- .iter()
- .map(|field| match field.data_type() {
- DataType::Binary => field_with_new_type(field, DataType::Utf8),
- DataType::LargeBinary => field_with_new_type(field,
DataType::LargeUtf8),
- DataType::BinaryView => field_with_new_type(field,
DataType::Utf8View),
- _ => Arc::clone(field),
- })
- .collect();
- Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
-}
-
-/// If the table schema uses a string type, coerce the file schema to use a
string type.
-///
-/// See [parquet::ParquetFormat::binary_as_string] for details
-#[cfg(not(target_arch = "wasm32"))]
-pub(crate) fn coerce_file_schema_to_string_type(
- table_schema: &Schema,
- file_schema: &Schema,
-) -> Option<Schema> {
- let mut transform = false;
- let table_fields: HashMap<_, _> = table_schema
- .fields
- .iter()
- .map(|f| (f.name(), f.data_type()))
- .collect();
- let transformed_fields: Vec<Arc<Field>> = file_schema
- .fields
- .iter()
- .map(
- |field| match (table_fields.get(field.name()), field.data_type()) {
- // table schema uses string type, coerce the file schema to
use string type
- (
- Some(DataType::Utf8),
- DataType::Binary | DataType::LargeBinary |
DataType::BinaryView,
- ) => {
- transform = true;
- field_with_new_type(field, DataType::Utf8)
- }
- // table schema uses large string type, coerce the file schema
to use large string type
- (
- Some(DataType::LargeUtf8),
- DataType::Binary | DataType::LargeBinary |
DataType::BinaryView,
- ) => {
- transform = true;
- field_with_new_type(field, DataType::LargeUtf8)
- }
- // table schema uses string view type, coerce the file schema
to use view type
- (
- Some(DataType::Utf8View),
- DataType::Binary | DataType::LargeBinary |
DataType::BinaryView,
- ) => {
- transform = true;
- field_with_new_type(field, DataType::Utf8View)
- }
- _ => Arc::clone(field),
- },
- )
- .collect();
-
- if !transform {
- None
- } else {
- Some(Schema::new_with_metadata(
- transformed_fields,
- file_schema.metadata.clone(),
- ))
- }
-}
-
#[cfg(test)]
pub(crate) mod test_util {
+ use std::fmt::{self, Display};
use std::ops::Range;
- use std::sync::Mutex;
+ use std::sync::{Arc, Mutex};
- use super::*;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::test::object_store::local_unpartitioned_file;
+ use async_trait::async_trait;
use bytes::Bytes;
+ use datafusion_catalog::Session;
+ use datafusion_common::Result;
+ use datafusion_datasource::file_format::FileFormat;
+ use datafusion_datasource::file_scan_config::FileScanConfig;
+ use datafusion_physical_plan::ExecutionPlan;
use futures::stream::BoxStream;
use futures::StreamExt;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
use object_store::{
Attributes, GetOptions, GetResult, GetResultPayload, ListResult,
MultipartUpload,
- PutMultipartOpts, PutOptions, PutPayload, PutResult,
+ ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload,
PutResult,
};
pub async fn scan_format(
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs
b/datafusion/core/src/datasource/file_format/parquet.rs
index e9ecff7baf..98aa24ad00 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -25,11 +25,7 @@ use std::sync::Arc;
use super::write::demux::DemuxedStreamReceiver;
use super::write::{create_writer, SharedBuffer};
-use super::{
- coerce_file_schema_to_string_type, coerce_file_schema_to_view_type,
- transform_binary_to_string, transform_schema_to_view, FileFormat,
FileFormatFactory,
- FilePushdownSupport,
-};
+use super::{FileFormat, FileFormatFactory, FilePushdownSupport};
use crate::arrow::array::RecordBatch;
use crate::arrow::datatypes::{Fields, Schema, SchemaRef};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
@@ -47,6 +43,7 @@ use crate::physical_plan::{
};
use arrow::compute::sum;
+use arrow_schema::{DataType, Field, FieldRef};
use datafusion_catalog::Session;
use datafusion_common::config::{ConfigField, ConfigFileType,
TableParquetOptions};
use datafusion_common::parsers::CompressionTypeVariant;
@@ -471,6 +468,153 @@ impl FileFormat for ParquetFormat {
}
}
+/// Coerces the file schema if the table schema uses a view type.
+#[cfg(not(target_arch = "wasm32"))]
+pub fn coerce_file_schema_to_view_type(
+ table_schema: &Schema,
+ file_schema: &Schema,
+) -> Option<Schema> {
+ let mut transform = false;
+ let table_fields: HashMap<_, _> = table_schema
+ .fields
+ .iter()
+ .map(|f| {
+ let dt = f.data_type();
+ if dt.equals_datatype(&DataType::Utf8View)
+ || dt.equals_datatype(&DataType::BinaryView)
+ {
+ transform = true;
+ }
+ (f.name(), dt)
+ })
+ .collect();
+
+ if !transform {
+ return None;
+ }
+
+ let transformed_fields: Vec<Arc<Field>> = file_schema
+ .fields
+ .iter()
+ .map(
+ |field| match (table_fields.get(field.name()), field.data_type()) {
+ (Some(DataType::Utf8View), DataType::Utf8 |
DataType::LargeUtf8) => {
+ field_with_new_type(field, DataType::Utf8View)
+ }
+ (
+ Some(DataType::BinaryView),
+ DataType::Binary | DataType::LargeBinary,
+ ) => field_with_new_type(field, DataType::BinaryView),
+ _ => Arc::clone(field),
+ },
+ )
+ .collect();
+
+ Some(Schema::new_with_metadata(
+ transformed_fields,
+ file_schema.metadata.clone(),
+ ))
+}
+
+/// If the table schema uses a string type, coerce the file schema to use a
string type.
+///
+/// See [ParquetFormat::binary_as_string] for details
+#[cfg(not(target_arch = "wasm32"))]
+pub fn coerce_file_schema_to_string_type(
+ table_schema: &Schema,
+ file_schema: &Schema,
+) -> Option<Schema> {
+ let mut transform = false;
+ let table_fields: HashMap<_, _> = table_schema
+ .fields
+ .iter()
+ .map(|f| (f.name(), f.data_type()))
+ .collect();
+ let transformed_fields: Vec<Arc<Field>> = file_schema
+ .fields
+ .iter()
+ .map(
+ |field| match (table_fields.get(field.name()), field.data_type()) {
+ // table schema uses string type, coerce the file schema to
use string type
+ (
+ Some(DataType::Utf8),
+ DataType::Binary | DataType::LargeBinary |
DataType::BinaryView,
+ ) => {
+ transform = true;
+ field_with_new_type(field, DataType::Utf8)
+ }
+ // table schema uses large string type, coerce the file schema
to use large string type
+ (
+ Some(DataType::LargeUtf8),
+ DataType::Binary | DataType::LargeBinary |
DataType::BinaryView,
+ ) => {
+ transform = true;
+ field_with_new_type(field, DataType::LargeUtf8)
+ }
+ // table schema uses string view type, coerce the file schema
to use view type
+ (
+ Some(DataType::Utf8View),
+ DataType::Binary | DataType::LargeBinary |
DataType::BinaryView,
+ ) => {
+ transform = true;
+ field_with_new_type(field, DataType::Utf8View)
+ }
+ _ => Arc::clone(field),
+ },
+ )
+ .collect();
+
+ if !transform {
+ None
+ } else {
+ Some(Schema::new_with_metadata(
+ transformed_fields,
+ file_schema.metadata.clone(),
+ ))
+ }
+}
+
+/// Create a new field with the specified data type, copying the other
+/// properties from the input field
+fn field_with_new_type(field: &FieldRef, new_type: DataType) -> FieldRef {
+ Arc::new(field.as_ref().clone().with_data_type(new_type))
+}
+
+/// Transform a schema to use view types for Utf8 and Binary
+///
+/// See [ParquetFormat::force_view_types] for details
+pub fn transform_schema_to_view(schema: &Schema) -> Schema {
+ let transformed_fields: Vec<Arc<Field>> = schema
+ .fields
+ .iter()
+ .map(|field| match field.data_type() {
+ DataType::Utf8 | DataType::LargeUtf8 => {
+ field_with_new_type(field, DataType::Utf8View)
+ }
+ DataType::Binary | DataType::LargeBinary => {
+ field_with_new_type(field, DataType::BinaryView)
+ }
+ _ => Arc::clone(field),
+ })
+ .collect();
+ Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
+}
+
+/// Transform a schema so that any binary types are strings
+pub fn transform_binary_to_string(schema: &Schema) -> Schema {
+ let transformed_fields: Vec<Arc<Field>> = schema
+ .fields
+ .iter()
+ .map(|field| match field.data_type() {
+ DataType::Binary => field_with_new_type(field, DataType::Utf8),
+ DataType::LargeBinary => field_with_new_type(field,
DataType::LargeUtf8),
+ DataType::BinaryView => field_with_new_type(field,
DataType::Utf8View),
+ _ => Arc::clone(field),
+ })
+ .collect();
+ Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
+}
+
/// [`MetadataFetch`] adapter for reading bytes from an [`ObjectStore`]
struct ObjectStoreFetch<'a> {
store: &'a dyn ObjectStore,
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
index 138b448979..4230a1bdce 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs
@@ -19,7 +19,7 @@
use std::sync::Arc;
-use crate::datasource::file_format::{
+use crate::datasource::file_format::parquet::{
coerce_file_schema_to_string_type, coerce_file_schema_to_view_type,
};
use
crate::datasource::physical_plan::parquet::page_filter::PagePruningAccessPlanFilter;
diff --git a/datafusion/datasource/src/file_format.rs
b/datafusion/datasource/src/file_format.rs
new file mode 100644
index 0000000000..aa0338fab7
--- /dev/null
+++ b/datafusion/datasource/src/file_format.rs
@@ -0,0 +1,227 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Module containing helper methods for the various file formats
+//! See write.rs for write related helper methods
+
+use std::any::Any;
+use std::collections::HashMap;
+use std::fmt;
+use std::sync::Arc;
+
+use crate::file::FileSource;
+use crate::file_compression_type::FileCompressionType;
+use crate::file_scan_config::FileScanConfig;
+use crate::file_sink_config::FileSinkConfig;
+
+use arrow::datatypes::{Schema, SchemaRef};
+use async_trait::async_trait;
+use datafusion_catalog::Session;
+use datafusion_common::file_options::file_type::FileType;
+use datafusion_common::{internal_err, not_impl_err, GetExt, Result,
Statistics};
+use datafusion_expr::Expr;
+use datafusion_physical_expr::{LexRequirement, PhysicalExpr};
+use datafusion_physical_plan::ExecutionPlan;
+
+use object_store::{ObjectMeta, ObjectStore};
+
+/// Default max records to scan to infer the schema
+pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000;
+
+/// This trait abstracts all the file format specific implementations
+/// from the [`TableProvider`]. This helps code re-utilization across
+/// providers that support the same file formats.
+///
+/// [`TableProvider`]: datafusion_catalog::TableProvider
+#[async_trait]
+pub trait FileFormat: Send + Sync + fmt::Debug {
+ /// Returns the table provider as [`Any`](std::any::Any) so that it can be
+ /// downcast to a specific implementation.
+ fn as_any(&self) -> &dyn Any;
+
+ /// Returns the extension for this FileFormat, e.g. "file.csv" -> csv
+ fn get_ext(&self) -> String;
+
+ /// Returns the extension for this FileFormat when compressed, e.g.
"file.csv.gz" -> csv
+ fn get_ext_with_compression(
+ &self,
+ _file_compression_type: &FileCompressionType,
+ ) -> Result<String>;
+
+ /// Infer the common schema of the provided objects. The objects will
usually
+ /// be analysed up to a given number of records or files (as specified in
the
+ /// format config) then give the estimated common schema. This might fail
if
+ /// the files have schemas that cannot be merged.
+ async fn infer_schema(
+ &self,
+ state: &dyn Session,
+ store: &Arc<dyn ObjectStore>,
+ objects: &[ObjectMeta],
+ ) -> Result<SchemaRef>;
+
+ /// Infer the statistics for the provided object. The cost and accuracy of
the
+ /// estimated statistics might vary greatly between file formats.
+ ///
+ /// `table_schema` is the (combined) schema of the overall table
+ /// and may be a superset of the schema contained in this file.
+ ///
+ /// TODO: should the file source return statistics for only columns
referred to in the table schema?
+ async fn infer_stats(
+ &self,
+ state: &dyn Session,
+ store: &Arc<dyn ObjectStore>,
+ table_schema: SchemaRef,
+ object: &ObjectMeta,
+ ) -> Result<Statistics>;
+
+ /// Take a list of files and convert it to the appropriate executor
+ /// according to this file format.
+ async fn create_physical_plan(
+ &self,
+ state: &dyn Session,
+ conf: FileScanConfig,
+ filters: Option<&Arc<dyn PhysicalExpr>>,
+ ) -> Result<Arc<dyn ExecutionPlan>>;
+
+ /// Take a list of files and the configuration to convert it to the
+ /// appropriate writer executor according to this file format.
+ async fn create_writer_physical_plan(
+ &self,
+ _input: Arc<dyn ExecutionPlan>,
+ _state: &dyn Session,
+ _conf: FileSinkConfig,
+ _order_requirements: Option<LexRequirement>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ not_impl_err!("Writer not implemented for this format")
+ }
+
+ /// Check if the specified file format has support for pushing down the
provided filters within
+ /// the given schemas. Added initially to support the Parquet file
format's ability to do this.
+ fn supports_filters_pushdown(
+ &self,
+ _file_schema: &Schema,
+ _table_schema: &Schema,
+ _filters: &[&Expr],
+ ) -> Result<FilePushdownSupport> {
+ Ok(FilePushdownSupport::NoSupport)
+ }
+
+ /// Return the related FileSource such as `CsvSource`, `JsonSource`, etc.
+ fn file_source(&self) -> Arc<dyn FileSource>;
+}
+
+/// An enum to distinguish between different states when determining if
certain filters can be
+/// pushed down to file scanning
+#[derive(Debug, PartialEq)]
+pub enum FilePushdownSupport {
+ /// The file format/system being asked does not support any sort of
pushdown. This should be
+ /// used even if the file format theoretically supports some sort of
pushdown, but it's not
+ /// enabled or implemented yet.
+ NoSupport,
+ /// The file format/system being asked *does* support pushdown, but it
can't make it work for
+ /// the provided filter/expression
+ NotSupportedForFilter,
+ /// The file format/system being asked *does* support pushdown and *can*
make it work for the
+ /// provided filter/expression
+ Supported,
+}
+
+/// Factory for creating [`FileFormat`] instances based on session and command
level options
+///
+/// Users can provide their own `FileFormatFactory` to support arbitrary file
formats
+pub trait FileFormatFactory: Sync + Send + GetExt + fmt::Debug {
+ /// Initialize a [FileFormat] and configure based on session and command
level options
+ fn create(
+ &self,
+ state: &dyn Session,
+ format_options: &HashMap<String, String>,
+ ) -> Result<Arc<dyn FileFormat>>;
+
+ /// Initialize a [FileFormat] with all options set to default values
+ fn default(&self) -> Arc<dyn FileFormat>;
+
+ /// Returns the table source as [`Any`] so that it can be
+ /// downcast to a specific implementation.
+ fn as_any(&self) -> &dyn Any;
+}
+
+/// A container of [FileFormatFactory] which also implements [FileType].
+/// This enables converting a dyn FileFormat to a dyn FileType.
+/// The former trait is a superset of the latter trait, which includes
execution time
+/// relevant methods. [FileType] is only used in logical planning and only
implements
+/// the subset of methods required during logical planning.
+#[derive(Debug)]
+pub struct DefaultFileType {
+ file_format_factory: Arc<dyn FileFormatFactory>,
+}
+
+impl DefaultFileType {
+ /// Constructs a [DefaultFileType] wrapper from a [FileFormatFactory]
+ pub fn new(file_format_factory: Arc<dyn FileFormatFactory>) -> Self {
+ Self {
+ file_format_factory,
+ }
+ }
+
+ /// get a reference to the inner [FileFormatFactory] struct
+ pub fn as_format_factory(&self) -> &Arc<dyn FileFormatFactory> {
+ &self.file_format_factory
+ }
+}
+
+impl FileType for DefaultFileType {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+}
+
+impl fmt::Display for DefaultFileType {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "{:?}", self.file_format_factory)
+ }
+}
+
+impl GetExt for DefaultFileType {
+ fn get_ext(&self) -> String {
+ self.file_format_factory.get_ext()
+ }
+}
+
+/// Converts a [FileFormatFactory] to a [FileType]
+pub fn format_as_file_type(
+ file_format_factory: Arc<dyn FileFormatFactory>,
+) -> Arc<dyn FileType> {
+ Arc::new(DefaultFileType {
+ file_format_factory,
+ })
+}
+
+/// Converts a [FileType] to a [FileFormatFactory].
+/// Returns an error if the [FileType] cannot be
+/// downcasted to a [DefaultFileType].
+pub fn file_type_to_format(
+ file_type: &Arc<dyn FileType>,
+) -> Result<Arc<dyn FileFormatFactory>> {
+ match file_type
+ .as_ref()
+ .as_any()
+ .downcast_ref::<DefaultFileType>()
+ {
+ Some(source) => Ok(Arc::clone(&source.file_format_factory)),
+ _ => internal_err!("FileType was not DefaultFileType"),
+ }
+}
diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs
index e60b02f9c9..0ed5975847 100644
--- a/datafusion/datasource/src/mod.rs
+++ b/datafusion/datasource/src/mod.rs
@@ -27,6 +27,7 @@
pub mod display;
pub mod file;
pub mod file_compression_type;
+pub mod file_format;
pub mod file_groups;
pub mod file_meta;
pub mod file_scan_config;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]