alamb commented on code in PR #18080:
URL: https://github.com/apache/datafusion/pull/18080#discussion_r2433892567
##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -1638,15 +191,18 @@ mod tests {
let ctx = SessionContext::new();
let testdata = datafusion_test_data();
let filename = format!("{testdata}/aggregate_simple.csv");
- let table_path = ListingTableUrl::parse(filename).unwrap();
+ let table_path = ListingTableUrl::parse(filename)?;
// Test default schema source
let config = ListingTableConfig::new(table_path.clone());
assert_eq!(config.schema_source(), SchemaSource::Unset);
// Test schema source after setting a schema explicitly
let provided_schema = create_test_schema();
- let config_with_schema =
config.clone().with_schema(provided_schema.clone());
+ let config_with_schema = config
+ .clone()
+ .with_listing_options(mock_listing_config_options())
Review Comment:
why is the `mock_listing_config_options` needed?
##########
datafusion/catalog-listing/src/mod.rs:
##########
@@ -24,4 +24,1491 @@
// https://github.com/apache/datafusion/issues/11143
#![cfg_attr(not(test), deny(clippy::clone_on_ref_ptr))]
+use crate::helpers::{expr_applicable_for_cols, pruned_partition_list};
+use arrow::datatypes::{DataType, Field, Schema, SchemaBuilder, SchemaRef};
+use async_trait::async_trait;
+use datafusion_catalog::Session;
+use datafusion_catalog::{ScanArgs, ScanResult, TableProvider};
+use datafusion_common::stats::Precision;
+use datafusion_common::{
+ config_err, internal_datafusion_err, internal_err, plan_err,
project_schema,
+ Constraints, DataFusionError, SchemaExt, Statistics,
+};
+use datafusion_datasource::file::FileSource;
+use datafusion_datasource::file_compression_type::FileCompressionType;
+use datafusion_datasource::file_format::FileFormat;
+use datafusion_datasource::file_groups::FileGroup;
+use datafusion_datasource::file_scan_config::{FileScanConfig,
FileScanConfigBuilder};
+use datafusion_datasource::file_sink_config::FileSinkConfig;
+use datafusion_datasource::schema_adapter::{
+ DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory,
+};
+pub use datafusion_datasource::{
+ compute_all_files_statistics, ListingTableUrl, PartitionedFile,
+};
+use datafusion_execution::cache::cache_manager::FileStatisticsCache;
+use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
+use datafusion_execution::config::SessionConfig;
+use datafusion_expr::dml::InsertOp;
+use datafusion_expr::execution_props::ExecutionProps;
+use datafusion_expr::{Expr, SortExpr, TableProviderFilterPushDown, TableType};
+use datafusion_physical_expr::create_lex_ordering;
+use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use datafusion_physical_plan::empty::EmptyExec;
+use datafusion_physical_plan::ExecutionPlan;
+use futures::{future, stream, Stream, StreamExt, TryStreamExt};
+use itertools::Itertools;
+use object_store::ObjectStore;
+use std::any::Any;
+use std::collections::HashMap;
+use std::str::FromStr;
+use std::sync::Arc;
+
pub mod helpers;
+
+/// Indicates the source of the schema for a [`ListingTable`]
+// PartialEq required for assert_eq! in tests
+#[derive(Debug, Clone, Copy, PartialEq, Default)]
+pub enum SchemaSource {
+ /// Schema is not yet set (initial state)
+ #[default]
+ Unset,
+ /// Schema was inferred from first table_path
+ Inferred,
+ /// Schema was specified explicitly via with_schema
+ Specified,
+}
+
+/// Configuration for creating a [`ListingTable`]
+///
+/// # Schema Evolution Support
+///
+/// This configuration supports schema evolution through the optional
+/// [`SchemaAdapterFactory`]. You might want to override the default factory
when you need:
+///
+/// - **Type coercion requirements**: When you need custom logic for
converting between
+/// different Arrow data types (e.g., Int32 ↔ Int64, Utf8 ↔ LargeUtf8)
+/// - **Column mapping**: You need to map columns with a legacy name to a new
name
+/// - **Custom handling of missing columns**: By default they are filled in
with nulls, but you may e.g. want to fill them in with `0` or `""`.
+///
+/// If not specified, a [`DefaultSchemaAdapterFactory`] will be used, which
handles
+/// basic schema compatibility cases.
+///
+#[derive(Debug, Clone, Default)]
+pub struct ListingTableConfig {
Review Comment:
if we are moving this all into another crate anyways, maybe we can break it
into smaller modules too (e.g. `datafusion/catalog-listing/src/config.rs` for
example for `ListingTableConfig`)
Can totally be done as a follow on
##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -15,226 +15,40 @@
// specific language governing permissions and limitations
// under the License.
-//! The table implementation.
-
-use super::{
- helpers::{expr_applicable_for_cols, pruned_partition_list},
- ListingTableUrl, PartitionedFile,
-};
-use crate::{
- datasource::file_format::{file_compression_type::FileCompressionType,
FileFormat},
- datasource::physical_plan::FileSinkConfig,
- execution::context::SessionState,
-};
-use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
-use arrow_schema::Schema;
+use crate::execution::SessionState;
use async_trait::async_trait;
-use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider};
-use datafusion_common::{
- config_datafusion_err, config_err, internal_datafusion_err, internal_err,
plan_err,
- project_schema, stats::Precision, Constraints, DataFusionError, Result,
SchemaExt,
-};
-use datafusion_datasource::{
- compute_all_files_statistics,
- file::FileSource,
- file_groups::FileGroup,
- file_scan_config::{FileScanConfig, FileScanConfigBuilder},
- schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapter,
SchemaAdapterFactory},
-};
-use datafusion_execution::{
- cache::{cache_manager::FileStatisticsCache,
cache_unit::DefaultFileStatisticsCache},
- config::SessionConfig,
-};
-use datafusion_expr::execution_props::ExecutionProps;
-use datafusion_expr::{
- dml::InsertOp, Expr, SortExpr, TableProviderFilterPushDown, TableType,
-};
-use datafusion_physical_expr::create_lex_ordering;
-use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
-use datafusion_physical_expr_common::sort_expr::LexOrdering;
-use datafusion_physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics};
-use futures::{future, stream, Stream, StreamExt, TryStreamExt};
-use itertools::Itertools;
-use object_store::ObjectStore;
-use std::{any::Any, collections::HashMap, str::FromStr, sync::Arc};
-
-/// Indicates the source of the schema for a [`ListingTable`]
-// PartialEq required for assert_eq! in tests
-#[derive(Debug, Clone, Copy, PartialEq, Default)]
-pub enum SchemaSource {
- /// Schema is not yet set (initial state)
- #[default]
- Unset,
- /// Schema was inferred from first table_path
- Inferred,
- /// Schema was specified explicitly via with_schema
- Specified,
-}
-
-/// Configuration for creating a [`ListingTable`]
-///
-/// # Schema Evolution Support
-///
-/// This configuration supports schema evolution through the optional
-/// [`SchemaAdapterFactory`]. You might want to override the default factory
when you need:
-///
-/// - **Type coercion requirements**: When you need custom logic for
converting between
-/// different Arrow data types (e.g., Int32 ↔ Int64, Utf8 ↔ LargeUtf8)
-/// - **Column mapping**: You need to map columns with a legacy name to a new
name
-/// - **Custom handling of missing columns**: By default they are filled in
with nulls, but you may e.g. want to fill them in with `0` or `""`.
-///
-/// If not specified, a [`DefaultSchemaAdapterFactory`] will be used, which
handles
-/// basic schema compatibility cases.
-///
-#[derive(Debug, Clone, Default)]
-pub struct ListingTableConfig {
- /// Paths on the `ObjectStore` for creating `ListingTable`.
- /// They should share the same schema and object store.
- pub table_paths: Vec<ListingTableUrl>,
- /// Optional `SchemaRef` for the to be created `ListingTable`.
- ///
- /// See details on [`ListingTableConfig::with_schema`]
- pub file_schema: Option<SchemaRef>,
- /// Optional [`ListingOptions`] for the to be created [`ListingTable`].
- ///
- /// See details on [`ListingTableConfig::with_listing_options`]
- pub options: Option<ListingOptions>,
- /// Tracks the source of the schema information
- schema_source: SchemaSource,
- /// Optional [`SchemaAdapterFactory`] for creating schema adapters
- schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
- /// Optional [`PhysicalExprAdapterFactory`] for creating physical
expression adapters
- expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
-}
-
-impl ListingTableConfig {
- /// Creates new [`ListingTableConfig`] for reading the specified URL
- pub fn new(table_path: ListingTableUrl) -> Self {
- Self {
- table_paths: vec![table_path],
- ..Default::default()
- }
- }
-
- /// Creates new [`ListingTableConfig`] with multiple table paths.
- ///
- /// See [`Self::infer_options`] for details on what happens with multiple
paths
- pub fn new_with_multi_paths(table_paths: Vec<ListingTableUrl>) -> Self {
- Self {
- table_paths,
- ..Default::default()
- }
- }
-
- /// Returns the source of the schema for this configuration
- pub fn schema_source(&self) -> SchemaSource {
- self.schema_source
- }
- /// Set the `schema` for the overall [`ListingTable`]
- ///
- /// [`ListingTable`] will automatically coerce, when possible, the schema
- /// for individual files to match this schema.
- ///
- /// If a schema is not provided, it is inferred using
- /// [`Self::infer_schema`].
- ///
- /// If the schema is provided, it must contain only the fields in the file
- /// without the table partitioning columns.
- ///
- /// # Example: Specifying Table Schema
- /// ```rust
- /// # use std::sync::Arc;
- /// # use datafusion::datasource::listing::{ListingTableConfig,
ListingOptions, ListingTableUrl};
- /// # use datafusion::datasource::file_format::parquet::ParquetFormat;
- /// # use arrow::datatypes::{Schema, Field, DataType};
- /// # let table_paths =
ListingTableUrl::parse("file:///path/to/data").unwrap();
- /// # let listing_options =
ListingOptions::new(Arc::new(ParquetFormat::default()));
- /// let schema = Arc::new(Schema::new(vec![
- /// Field::new("id", DataType::Int64, false),
- /// Field::new("name", DataType::Utf8, true),
- /// ]));
- ///
- /// let config = ListingTableConfig::new(table_paths)
- /// .with_listing_options(listing_options) // Set options first
- /// .with_schema(schema); // Then set schema
- /// ```
- pub fn with_schema(self, schema: SchemaRef) -> Self {
- // Note: We preserve existing options state, but downstream code may
expect
- // options to be set. Consider calling with_listing_options() or
infer_options()
- // before operations that require options to be present.
- debug_assert!(
- self.options.is_some() || cfg!(test),
- "ListingTableConfig::with_schema called without options set. \
- Consider calling with_listing_options() or infer_options() first
to avoid panics in downstream code."
- );
-
- Self {
- file_schema: Some(schema),
- schema_source: SchemaSource::Specified,
- ..self
- }
- }
-
- /// Add `listing_options` to [`ListingTableConfig`]
- ///
- /// If not provided, format and other options are inferred via
- /// [`Self::infer_options`].
- ///
- /// # Example: Configuring Parquet Files with Custom Options
- /// ```rust
- /// # use std::sync::Arc;
- /// # use datafusion::datasource::listing::{ListingTableConfig,
ListingOptions, ListingTableUrl};
- /// # use datafusion::datasource::file_format::parquet::ParquetFormat;
- /// # let table_paths =
ListingTableUrl::parse("file:///path/to/data").unwrap();
- /// let options = ListingOptions::new(Arc::new(ParquetFormat::default()))
- /// .with_file_extension(".parquet")
- /// .with_collect_stat(true);
- ///
- /// let config = ListingTableConfig::new(table_paths)
- /// .with_listing_options(options); // Configure file format and
options
- /// ```
- pub fn with_listing_options(self, listing_options: ListingOptions) -> Self
{
- // Note: This method properly sets options, but be aware that
downstream
- // methods like infer_schema() and try_new() require both schema and
options
- // to be set to function correctly.
- debug_assert!(
- !self.table_paths.is_empty() || cfg!(test),
- "ListingTableConfig::with_listing_options called without
table_paths set. \
- Consider calling new() or new_with_multi_paths() first to
establish table paths."
- );
-
- Self {
- options: Some(listing_options),
- ..self
- }
- }
-
- /// Returns a tuple of `(file_extension, optional compression_extension)`
- ///
- /// For example a path ending with blah.test.csv.gz returns `("csv",
Some("gz"))`
- /// For example a path ending with blah.test.csv returns `("csv", None)`
- fn infer_file_extension_and_compression_type(
- path: &str,
- ) -> Result<(String, Option<String>)> {
- let mut exts = path.rsplit('.');
-
- let split = exts.next().unwrap_or("");
-
- let file_compression_type = FileCompressionType::from_str(split)
- .unwrap_or(FileCompressionType::UNCOMPRESSED);
-
- if file_compression_type.is_compressed() {
- let split2 = exts.next().unwrap_or("");
- Ok((split2.to_string(), Some(split.to_string())))
- } else {
- Ok((split.to_string(), None))
- }
- }
-
+use datafusion_catalog_listing::{ListingOptions, ListingTableConfig};
+use datafusion_common::{config_datafusion_err, internal_datafusion_err};
+use datafusion_session::Session;
+use futures::StreamExt;
+use std::collections::HashMap;
+
+/// This trait exists because the following inference methods only
+/// work for [`SessionState`] implementations of [`Session`].
+/// See [`ListingTableConfig`] for the remaining inference methods.
+#[async_trait]
+pub trait ListingTableConfigExt {
/// Infer `ListingOptions` based on `table_path` and file suffix.
///
/// The format is inferred based on the first `table_path`.
- pub async fn infer_options(self, state: &dyn Session) -> Result<Self> {
+ async fn infer_options(
+ self,
+ state: &dyn Session,
+ ) -> datafusion_common::Result<ListingTableConfig>;
+
+ /// Convenience method to call both [`Self::infer_options`] and
[`ListingTableConfig::infer_schema`]
+ async fn infer(
+ self,
+ state: &dyn Session,
+ ) -> datafusion_common::Result<ListingTableConfig>;
+}
+
+#[async_trait]
+impl ListingTableConfigExt for ListingTableConfig {
+ async fn infer_options(
+ self,
+ state: &dyn Session,
+ ) -> datafusion_common::Result<ListingTableConfig> {
Review Comment:
I think this is an elegant solution
##########
datafusion/catalog-listing/src/mod.rs:
##########
@@ -24,4 +24,1491 @@
// https://github.com/apache/datafusion/issues/11143
#![cfg_attr(not(test), deny(clippy::clone_on_ref_ptr))]
+use crate::helpers::{expr_applicable_for_cols, pruned_partition_list};
+use arrow::datatypes::{DataType, Field, Schema, SchemaBuilder, SchemaRef};
+use async_trait::async_trait;
+use datafusion_catalog::Session;
+use datafusion_catalog::{ScanArgs, ScanResult, TableProvider};
+use datafusion_common::stats::Precision;
+use datafusion_common::{
+ config_err, internal_datafusion_err, internal_err, plan_err,
project_schema,
+ Constraints, DataFusionError, SchemaExt, Statistics,
+};
+use datafusion_datasource::file::FileSource;
+use datafusion_datasource::file_compression_type::FileCompressionType;
+use datafusion_datasource::file_format::FileFormat;
+use datafusion_datasource::file_groups::FileGroup;
+use datafusion_datasource::file_scan_config::{FileScanConfig,
FileScanConfigBuilder};
+use datafusion_datasource::file_sink_config::FileSinkConfig;
+use datafusion_datasource::schema_adapter::{
+ DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory,
+};
+pub use datafusion_datasource::{
+ compute_all_files_statistics, ListingTableUrl, PartitionedFile,
+};
+use datafusion_execution::cache::cache_manager::FileStatisticsCache;
+use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
+use datafusion_execution::config::SessionConfig;
+use datafusion_expr::dml::InsertOp;
+use datafusion_expr::execution_props::ExecutionProps;
+use datafusion_expr::{Expr, SortExpr, TableProviderFilterPushDown, TableType};
+use datafusion_physical_expr::create_lex_ordering;
+use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use datafusion_physical_plan::empty::EmptyExec;
+use datafusion_physical_plan::ExecutionPlan;
+use futures::{future, stream, Stream, StreamExt, TryStreamExt};
+use itertools::Itertools;
+use object_store::ObjectStore;
+use std::any::Any;
+use std::collections::HashMap;
+use std::str::FromStr;
+use std::sync::Arc;
+
pub mod helpers;
+
+/// Indicates the source of the schema for a [`ListingTable`]
+// PartialEq required for assert_eq! in tests
+#[derive(Debug, Clone, Copy, PartialEq, Default)]
+pub enum SchemaSource {
+ /// Schema is not yet set (initial state)
+ #[default]
+ Unset,
+ /// Schema was inferred from first table_path
+ Inferred,
+ /// Schema was specified explicitly via with_schema
+ Specified,
+}
+
+/// Configuration for creating a [`ListingTable`]
+///
+/// # Schema Evolution Support
+///
+/// This configuration supports schema evolution through the optional
+/// [`SchemaAdapterFactory`]. You might want to override the default factory
when you need:
+///
+/// - **Type coercion requirements**: When you need custom logic for
converting between
+/// different Arrow data types (e.g., Int32 ↔ Int64, Utf8 ↔ LargeUtf8)
+/// - **Column mapping**: You need to map columns with a legacy name to a new
name
+/// - **Custom handling of missing columns**: By default they are filled in
with nulls, but you may e.g. want to fill them in with `0` or `""`.
+///
+/// If not specified, a [`DefaultSchemaAdapterFactory`] will be used, which
handles
+/// basic schema compatibility cases.
+///
+#[derive(Debug, Clone, Default)]
+pub struct ListingTableConfig {
+ /// Paths on the `ObjectStore` for creating `ListingTable`.
+ /// They should share the same schema and object store.
+ pub table_paths: Vec<ListingTableUrl>,
+ /// Optional `SchemaRef` for the to be created `ListingTable`.
+ ///
+ /// See details on [`ListingTableConfig::with_schema`]
+ pub file_schema: Option<SchemaRef>,
+ /// Optional [`ListingOptions`] for the to be created [`ListingTable`].
+ ///
+ /// See details on [`ListingTableConfig::with_listing_options`]
+ pub options: Option<ListingOptions>,
+ /// Tracks the source of the schema information
+ schema_source: SchemaSource,
+ /// Optional [`SchemaAdapterFactory`] for creating schema adapters
+ schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
+ /// Optional [`PhysicalExprAdapterFactory`] for creating physical
expression adapters
+ expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
+}
+
+impl ListingTableConfig {
+ /// Creates new [`ListingTableConfig`] for reading the specified URL
+ pub fn new(table_path: ListingTableUrl) -> Self {
+ Self {
+ table_paths: vec![table_path],
+ ..Default::default()
+ }
+ }
+
+ /// Creates new [`ListingTableConfig`] with multiple table paths.
+ ///
+ /// See `ListingTableConfigExt::infer_options` for details on what
happens with multiple paths
+ pub fn new_with_multi_paths(table_paths: Vec<ListingTableUrl>) -> Self {
+ Self {
+ table_paths,
+ ..Default::default()
+ }
+ }
+
+ /// Returns the source of the schema for this configuration
+ pub fn schema_source(&self) -> SchemaSource {
+ self.schema_source
+ }
+ /// Set the `schema` for the overall [`ListingTable`]
+ ///
+ /// [`ListingTable`] will automatically coerce, when possible, the schema
+ /// for individual files to match this schema.
+ ///
+ /// If a schema is not provided, it is inferred using
+ /// [`Self::infer_schema`].
+ ///
+ /// If the schema is provided, it must contain only the fields in the file
+ /// without the table partitioning columns.
+ ///
+ /// # Example: Specifying Table Schema
+ /// ```rust
+ /// # use std::sync::Arc;
+ /// # use datafusion_catalog_listing::{ListingTableConfig, ListingOptions,
ListingTableUrl};
+ /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
+ /// # use arrow::datatypes::{Schema, Field, DataType};
+ /// # let table_paths =
ListingTableUrl::parse("file:///path/to/data").unwrap();
+ /// # let listing_options =
ListingOptions::new(Arc::new(ParquetFormat::default()));
+ /// let schema = Arc::new(Schema::new(vec![
+ /// Field::new("id", DataType::Int64, false),
+ /// Field::new("name", DataType::Utf8, true),
+ /// ]));
+ ///
+ /// let config = ListingTableConfig::new(table_paths)
+ /// .with_listing_options(listing_options) // Set options first
+ /// .with_schema(schema); // Then set schema
+ /// ```
+ pub fn with_schema(self, schema: SchemaRef) -> Self {
+ // Note: We preserve existing options state, but downstream code may
expect
+ // options to be set. Consider calling with_listing_options() or
infer_options()
+ // before operations that require options to be present.
+ debug_assert!(
+ self.options.is_some() || cfg!(test),
+ "ListingTableConfig::with_schema called without options set. \
+ Consider calling with_listing_options() or infer_options() first
to avoid panics in downstream code."
+ );
+
+ Self {
+ file_schema: Some(schema),
+ schema_source: SchemaSource::Specified,
+ ..self
+ }
+ }
+
+ /// Add `listing_options` to [`ListingTableConfig`]
+ ///
+ /// If not provided, format and other options are inferred via
+ /// `ListingTableConfigExt::infer_options`.
+ ///
+ /// # Example: Configuring Parquet Files with Custom Options
+ /// ```rust
+ /// # use std::sync::Arc;
+ /// # use datafusion_catalog_listing::{ListingTableConfig, ListingOptions,
ListingTableUrl};
+ /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
+ /// # let table_paths =
ListingTableUrl::parse("file:///path/to/data").unwrap();
+ /// let options = ListingOptions::new(Arc::new(ParquetFormat::default()))
+ /// .with_file_extension(".parquet")
+ /// .with_collect_stat(true);
+ ///
+ /// let config = ListingTableConfig::new(table_paths)
+ /// .with_listing_options(options); // Configure file format and
options
+ /// ```
+ pub fn with_listing_options(self, listing_options: ListingOptions) -> Self
{
+ // Note: This method properly sets options, but be aware that
downstream
+ // methods like infer_schema() and try_new() require both schema and
options
+ // to be set to function correctly.
+ debug_assert!(
+ !self.table_paths.is_empty() || cfg!(test),
+ "ListingTableConfig::with_listing_options called without
table_paths set. \
+ Consider calling new() or new_with_multi_paths() first to
establish table paths."
+ );
+
+ Self {
+ options: Some(listing_options),
+ ..self
+ }
+ }
+
+ /// Returns a tuple of `(file_extension, optional compression_extension)`
+ ///
+ /// For example a path ending with blah.test.csv.gz returns `("csv",
Some("gz"))`
+ /// For example a path ending with blah.test.csv returns `("csv", None)`
+ pub fn infer_file_extension_and_compression_type(
+ path: &str,
+ ) -> datafusion_common::Result<(String, Option<String>)> {
+ let mut exts = path.rsplit('.');
+
+ let split = exts.next().unwrap_or("");
+
+ let file_compression_type = FileCompressionType::from_str(split)
+ .unwrap_or(FileCompressionType::UNCOMPRESSED);
+
+ if file_compression_type.is_compressed() {
+ let split2 = exts.next().unwrap_or("");
+ Ok((split2.to_string(), Some(split.to_string())))
+ } else {
+ Ok((split.to_string(), None))
+ }
+ }
+
+ /// Infer the [`SchemaRef`] based on `table_path`s.
+ ///
+ /// This method infers the table schema using the first `table_path`.
+ /// See [`ListingOptions::infer_schema`] for more details
+ ///
+ /// # Errors
+ /// * if `self.options` is not set. See [`Self::with_listing_options`]
+ pub async fn infer_schema(
+ self,
+ state: &dyn Session,
+ ) -> datafusion_common::Result<Self> {
+ match self.options {
+ Some(options) => {
+ let ListingTableConfig {
+ table_paths,
+ file_schema,
+ options: _,
+ schema_source,
+ schema_adapter_factory,
+ expr_adapter_factory: physical_expr_adapter_factory,
+ } = self;
+
+ let (schema, new_schema_source) = match file_schema {
+ Some(schema) => (schema, schema_source), // Keep existing
source if schema exists
+ None => {
+ if let Some(url) = table_paths.first() {
+ (
+ options.infer_schema(state, url).await?,
+ SchemaSource::Inferred,
+ )
+ } else {
+ (Arc::new(Schema::empty()), SchemaSource::Inferred)
+ }
+ }
+ };
+
+ Ok(Self {
+ table_paths,
+ file_schema: Some(schema),
+ options: Some(options),
+ schema_source: new_schema_source,
+ schema_adapter_factory,
+ expr_adapter_factory: physical_expr_adapter_factory,
+ })
+ }
+ None => internal_err!("No `ListingOptions` set for inferring
schema"),
+ }
+ }
+
+ /// Infer the partition columns from `table_paths`.
+ ///
+ /// # Errors
+ /// * if `self.options` is not set. See [`Self::with_listing_options`]
+ pub async fn infer_partitions_from_path(
+ self,
+ state: &dyn Session,
+ ) -> datafusion_common::Result<Self> {
+ match self.options {
+ Some(options) => {
+ let Some(url) = self.table_paths.first() else {
+ return config_err!("No table path found");
+ };
+ let partitions = options
+ .infer_partitions(state, url)
+ .await?
+ .into_iter()
+ .map(|col_name| {
+ (
+ col_name,
+ DataType::Dictionary(
+ Box::new(DataType::UInt16),
+ Box::new(DataType::Utf8),
+ ),
+ )
+ })
+ .collect::<Vec<_>>();
+ let options = options.with_table_partition_cols(partitions);
+ Ok(Self {
+ table_paths: self.table_paths,
+ file_schema: self.file_schema,
+ options: Some(options),
+ schema_source: self.schema_source,
+ schema_adapter_factory: self.schema_adapter_factory,
+ expr_adapter_factory: self.expr_adapter_factory,
+ })
+ }
+ None => config_err!("No `ListingOptions` set for inferring
schema"),
+ }
+ }
+
+ /// Set the [`SchemaAdapterFactory`] for the [`ListingTable`]
+ ///
+ /// The schema adapter factory is used to create schema adapters that can
+ /// handle schema evolution and type conversions when reading files with
+ /// different schemas than the table schema.
+ ///
+ /// If not provided, a default schema adapter factory will be used.
+ ///
+ /// # Example: Custom Schema Adapter for Type Coercion
+ /// ```rust
+ /// # use std::sync::Arc;
+ /// # use datafusion_catalog_listing::{ListingTableConfig, ListingOptions,
ListingTableUrl};
+ /// # use datafusion_datasource::schema_adapter::{SchemaAdapterFactory,
SchemaAdapter};
+ /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
+ /// # use arrow::datatypes::{SchemaRef, Schema, Field, DataType};
+ /// #
+ /// # #[derive(Debug)]
+ /// # struct MySchemaAdapterFactory;
+ /// # impl SchemaAdapterFactory for MySchemaAdapterFactory {
+ /// # fn create(&self, _projected_table_schema: SchemaRef,
_file_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
+ /// # unimplemented!()
+ /// # }
+ /// # }
+ /// # let table_paths =
ListingTableUrl::parse("file:///path/to/data").unwrap();
+ /// # let listing_options =
ListingOptions::new(Arc::new(ParquetFormat::default()));
+ /// # let table_schema = Arc::new(Schema::new(vec![Field::new("id",
DataType::Int64, false)]));
+ /// let config = ListingTableConfig::new(table_paths)
+ /// .with_listing_options(listing_options)
+ /// .with_schema(table_schema)
+ /// .with_schema_adapter_factory(Arc::new(MySchemaAdapterFactory));
+ /// ```
+ pub fn with_schema_adapter_factory(
+ self,
+ schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
+ ) -> Self {
+ Self {
+ schema_adapter_factory: Some(schema_adapter_factory),
+ ..self
+ }
+ }
+
+ /// Get the [`SchemaAdapterFactory`] for this configuration
+ pub fn schema_adapter_factory(&self) -> Option<&Arc<dyn
SchemaAdapterFactory>> {
+ self.schema_adapter_factory.as_ref()
+ }
+
+ /// Set the [`PhysicalExprAdapterFactory`] for the [`ListingTable`]
+ ///
+ /// The expression adapter factory is used to create physical expression
adapters that can
+ /// handle schema evolution and type conversions when evaluating
expressions
+ /// with different schemas than the table schema.
+ ///
+ /// If not provided, a default physical expression adapter factory will be
used unless a custom
+ /// `SchemaAdapterFactory` is set, in which case only the
`SchemaAdapterFactory` will be used.
+ ///
+ /// See <https://github.com/apache/datafusion/issues/16800> for details on
this transition.
+ pub fn with_expr_adapter_factory(
+ self,
+ expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory>,
+ ) -> Self {
+ Self {
+ expr_adapter_factory: Some(expr_adapter_factory),
+ ..self
+ }
+ }
+}
+
+/// Options for creating a [`ListingTable`]
+#[derive(Clone, Debug)]
+pub struct ListingOptions {
Review Comment:
perhaps in `catalog-listing/src/options.rs`
##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -15,226 +15,40 @@
// specific language governing permissions and limitations
// under the License.
-//! The table implementation.
-
-use super::{
- helpers::{expr_applicable_for_cols, pruned_partition_list},
- ListingTableUrl, PartitionedFile,
-};
-use crate::{
- datasource::file_format::{file_compression_type::FileCompressionType,
FileFormat},
- datasource::physical_plan::FileSinkConfig,
- execution::context::SessionState,
-};
-use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
-use arrow_schema::Schema;
+use crate::execution::SessionState;
use async_trait::async_trait;
-use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider};
-use datafusion_common::{
- config_datafusion_err, config_err, internal_datafusion_err, internal_err,
plan_err,
- project_schema, stats::Precision, Constraints, DataFusionError, Result,
SchemaExt,
-};
-use datafusion_datasource::{
- compute_all_files_statistics,
- file::FileSource,
- file_groups::FileGroup,
- file_scan_config::{FileScanConfig, FileScanConfigBuilder},
- schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapter,
SchemaAdapterFactory},
-};
-use datafusion_execution::{
- cache::{cache_manager::FileStatisticsCache,
cache_unit::DefaultFileStatisticsCache},
- config::SessionConfig,
-};
-use datafusion_expr::execution_props::ExecutionProps;
-use datafusion_expr::{
- dml::InsertOp, Expr, SortExpr, TableProviderFilterPushDown, TableType,
-};
-use datafusion_physical_expr::create_lex_ordering;
-use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
-use datafusion_physical_expr_common::sort_expr::LexOrdering;
-use datafusion_physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics};
-use futures::{future, stream, Stream, StreamExt, TryStreamExt};
-use itertools::Itertools;
-use object_store::ObjectStore;
-use std::{any::Any, collections::HashMap, str::FromStr, sync::Arc};
-
-/// Indicates the source of the schema for a [`ListingTable`]
-// PartialEq required for assert_eq! in tests
-#[derive(Debug, Clone, Copy, PartialEq, Default)]
-pub enum SchemaSource {
- /// Schema is not yet set (initial state)
- #[default]
- Unset,
- /// Schema was inferred from first table_path
- Inferred,
- /// Schema was specified explicitly via with_schema
- Specified,
-}
-
-/// Configuration for creating a [`ListingTable`]
-///
-/// # Schema Evolution Support
-///
-/// This configuration supports schema evolution through the optional
-/// [`SchemaAdapterFactory`]. You might want to override the default factory
when you need:
-///
-/// - **Type coercion requirements**: When you need custom logic for
converting between
-/// different Arrow data types (e.g., Int32 ↔ Int64, Utf8 ↔ LargeUtf8)
-/// - **Column mapping**: You need to map columns with a legacy name to a new
name
-/// - **Custom handling of missing columns**: By default they are filled in
with nulls, but you may e.g. want to fill them in with `0` or `""`.
-///
-/// If not specified, a [`DefaultSchemaAdapterFactory`] will be used, which
handles
-/// basic schema compatibility cases.
-///
-#[derive(Debug, Clone, Default)]
-pub struct ListingTableConfig {
- /// Paths on the `ObjectStore` for creating `ListingTable`.
- /// They should share the same schema and object store.
- pub table_paths: Vec<ListingTableUrl>,
- /// Optional `SchemaRef` for the to be created `ListingTable`.
- ///
- /// See details on [`ListingTableConfig::with_schema`]
- pub file_schema: Option<SchemaRef>,
- /// Optional [`ListingOptions`] for the to be created [`ListingTable`].
- ///
- /// See details on [`ListingTableConfig::with_listing_options`]
- pub options: Option<ListingOptions>,
- /// Tracks the source of the schema information
- schema_source: SchemaSource,
- /// Optional [`SchemaAdapterFactory`] for creating schema adapters
- schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
- /// Optional [`PhysicalExprAdapterFactory`] for creating physical
expression adapters
- expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
-}
-
-impl ListingTableConfig {
- /// Creates new [`ListingTableConfig`] for reading the specified URL
- pub fn new(table_path: ListingTableUrl) -> Self {
- Self {
- table_paths: vec![table_path],
- ..Default::default()
- }
- }
-
- /// Creates new [`ListingTableConfig`] with multiple table paths.
- ///
- /// See [`Self::infer_options`] for details on what happens with multiple
paths
- pub fn new_with_multi_paths(table_paths: Vec<ListingTableUrl>) -> Self {
- Self {
- table_paths,
- ..Default::default()
- }
- }
-
- /// Returns the source of the schema for this configuration
- pub fn schema_source(&self) -> SchemaSource {
- self.schema_source
- }
- /// Set the `schema` for the overall [`ListingTable`]
- ///
- /// [`ListingTable`] will automatically coerce, when possible, the schema
- /// for individual files to match this schema.
- ///
- /// If a schema is not provided, it is inferred using
- /// [`Self::infer_schema`].
- ///
- /// If the schema is provided, it must contain only the fields in the file
- /// without the table partitioning columns.
- ///
- /// # Example: Specifying Table Schema
- /// ```rust
- /// # use std::sync::Arc;
- /// # use datafusion::datasource::listing::{ListingTableConfig,
ListingOptions, ListingTableUrl};
- /// # use datafusion::datasource::file_format::parquet::ParquetFormat;
- /// # use arrow::datatypes::{Schema, Field, DataType};
- /// # let table_paths =
ListingTableUrl::parse("file:///path/to/data").unwrap();
- /// # let listing_options =
ListingOptions::new(Arc::new(ParquetFormat::default()));
- /// let schema = Arc::new(Schema::new(vec![
- /// Field::new("id", DataType::Int64, false),
- /// Field::new("name", DataType::Utf8, true),
- /// ]));
- ///
- /// let config = ListingTableConfig::new(table_paths)
- /// .with_listing_options(listing_options) // Set options first
- /// .with_schema(schema); // Then set schema
- /// ```
- pub fn with_schema(self, schema: SchemaRef) -> Self {
- // Note: We preserve existing options state, but downstream code may
expect
- // options to be set. Consider calling with_listing_options() or
infer_options()
- // before operations that require options to be present.
- debug_assert!(
- self.options.is_some() || cfg!(test),
- "ListingTableConfig::with_schema called without options set. \
- Consider calling with_listing_options() or infer_options() first
to avoid panics in downstream code."
- );
-
- Self {
- file_schema: Some(schema),
- schema_source: SchemaSource::Specified,
- ..self
- }
- }
-
- /// Add `listing_options` to [`ListingTableConfig`]
- ///
- /// If not provided, format and other options are inferred via
- /// [`Self::infer_options`].
- ///
- /// # Example: Configuring Parquet Files with Custom Options
- /// ```rust
- /// # use std::sync::Arc;
- /// # use datafusion::datasource::listing::{ListingTableConfig,
ListingOptions, ListingTableUrl};
- /// # use datafusion::datasource::file_format::parquet::ParquetFormat;
- /// # let table_paths =
ListingTableUrl::parse("file:///path/to/data").unwrap();
- /// let options = ListingOptions::new(Arc::new(ParquetFormat::default()))
- /// .with_file_extension(".parquet")
- /// .with_collect_stat(true);
- ///
- /// let config = ListingTableConfig::new(table_paths)
- /// .with_listing_options(options); // Configure file format and
options
- /// ```
- pub fn with_listing_options(self, listing_options: ListingOptions) -> Self
{
- // Note: This method properly sets options, but be aware that
downstream
- // methods like infer_schema() and try_new() require both schema and
options
- // to be set to function correctly.
- debug_assert!(
- !self.table_paths.is_empty() || cfg!(test),
- "ListingTableConfig::with_listing_options called without
table_paths set. \
- Consider calling new() or new_with_multi_paths() first to
establish table paths."
- );
-
- Self {
- options: Some(listing_options),
- ..self
- }
- }
-
- /// Returns a tuple of `(file_extension, optional compression_extension)`
- ///
- /// For example a path ending with blah.test.csv.gz returns `("csv",
Some("gz"))`
- /// For example a path ending with blah.test.csv returns `("csv", None)`
- fn infer_file_extension_and_compression_type(
- path: &str,
- ) -> Result<(String, Option<String>)> {
- let mut exts = path.rsplit('.');
-
- let split = exts.next().unwrap_or("");
-
- let file_compression_type = FileCompressionType::from_str(split)
- .unwrap_or(FileCompressionType::UNCOMPRESSED);
-
- if file_compression_type.is_compressed() {
- let split2 = exts.next().unwrap_or("");
- Ok((split2.to_string(), Some(split.to_string())))
- } else {
- Ok((split.to_string(), None))
- }
- }
-
+use datafusion_catalog_listing::{ListingOptions, ListingTableConfig};
+use datafusion_common::{config_datafusion_err, internal_datafusion_err};
+use datafusion_session::Session;
+use futures::StreamExt;
+use std::collections::HashMap;
+
+/// This trait exists because the following inference methods only
Review Comment:
```suggestion
/// Extension trait for [`ListingTable`] that supports inferring schemas
///
/// This trait exists because the following inference methods only
```
##########
datafusion/catalog-listing/Cargo.toml:
##########
@@ -39,18 +39,24 @@ datafusion-datasource = { workspace = true }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-physical-expr = { workspace = true }
+datafusion-physical-expr-adapter = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
datafusion-physical-plan = { workspace = true }
futures = { workspace = true }
+itertools = { workspace = true }
log = { workspace = true }
object_store = { workspace = true }
tokio = { workspace = true }
[dev-dependencies]
+datafusion-datasource-parquet = { workspace = true }
Review Comment:
this is unfortunate but I dont think terrible.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]