timsaucer commented on code in PR #18080:
URL: https://github.com/apache/datafusion/pull/18080#discussion_r2433630799


##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -15,226 +15,40 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! The table implementation.
-
-use super::{
-    helpers::{expr_applicable_for_cols, pruned_partition_list},
-    ListingTableUrl, PartitionedFile,
-};
-use crate::{
-    datasource::file_format::{file_compression_type::FileCompressionType, 
FileFormat},
-    datasource::physical_plan::FileSinkConfig,
-    execution::context::SessionState,
-};
-use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
-use arrow_schema::Schema;
+use crate::execution::SessionState;
 use async_trait::async_trait;
-use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider};
-use datafusion_common::{
-    config_datafusion_err, config_err, internal_datafusion_err, internal_err, 
plan_err,
-    project_schema, stats::Precision, Constraints, DataFusionError, Result, 
SchemaExt,
-};
-use datafusion_datasource::{
-    compute_all_files_statistics,
-    file::FileSource,
-    file_groups::FileGroup,
-    file_scan_config::{FileScanConfig, FileScanConfigBuilder},
-    schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapter, 
SchemaAdapterFactory},
-};
-use datafusion_execution::{
-    cache::{cache_manager::FileStatisticsCache, 
cache_unit::DefaultFileStatisticsCache},
-    config::SessionConfig,
-};
-use datafusion_expr::execution_props::ExecutionProps;
-use datafusion_expr::{
-    dml::InsertOp, Expr, SortExpr, TableProviderFilterPushDown, TableType,
-};
-use datafusion_physical_expr::create_lex_ordering;
-use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
-use datafusion_physical_expr_common::sort_expr::LexOrdering;
-use datafusion_physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics};
-use futures::{future, stream, Stream, StreamExt, TryStreamExt};
-use itertools::Itertools;
-use object_store::ObjectStore;
-use std::{any::Any, collections::HashMap, str::FromStr, sync::Arc};
-
-/// Indicates the source of the schema for a [`ListingTable`]
-// PartialEq required for assert_eq! in tests
-#[derive(Debug, Clone, Copy, PartialEq, Default)]
-pub enum SchemaSource {
-    /// Schema is not yet set (initial state)
-    #[default]
-    Unset,
-    /// Schema was inferred from first table_path
-    Inferred,
-    /// Schema was specified explicitly via with_schema
-    Specified,
-}
-
-/// Configuration for creating a [`ListingTable`]
-///
-/// # Schema Evolution Support
-///
-/// This configuration supports schema evolution through the optional
-/// [`SchemaAdapterFactory`]. You might want to override the default factory 
when you need:
-///
-/// - **Type coercion requirements**: When you need custom logic for 
converting between
-///   different Arrow data types (e.g., Int32 ↔ Int64, Utf8 ↔ LargeUtf8)
-/// - **Column mapping**: You need to map columns with a legacy name to a new 
name
-/// - **Custom handling of missing columns**: By default they are filled in 
with nulls, but you may e.g. want to fill them in with `0` or `""`.
-///
-/// If not specified, a [`DefaultSchemaAdapterFactory`] will be used, which 
handles
-/// basic schema compatibility cases.
-///
-#[derive(Debug, Clone, Default)]
-pub struct ListingTableConfig {
-    /// Paths on the `ObjectStore` for creating `ListingTable`.
-    /// They should share the same schema and object store.
-    pub table_paths: Vec<ListingTableUrl>,
-    /// Optional `SchemaRef` for the to be created `ListingTable`.
-    ///
-    /// See details on [`ListingTableConfig::with_schema`]
-    pub file_schema: Option<SchemaRef>,
-    /// Optional [`ListingOptions`] for the to be created [`ListingTable`].
-    ///
-    /// See details on [`ListingTableConfig::with_listing_options`]
-    pub options: Option<ListingOptions>,
-    /// Tracks the source of the schema information
-    schema_source: SchemaSource,
-    /// Optional [`SchemaAdapterFactory`] for creating schema adapters
-    schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
-    /// Optional [`PhysicalExprAdapterFactory`] for creating physical 
expression adapters
-    expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
-}
-
-impl ListingTableConfig {
-    /// Creates new [`ListingTableConfig`] for reading the specified URL
-    pub fn new(table_path: ListingTableUrl) -> Self {
-        Self {
-            table_paths: vec![table_path],
-            ..Default::default()
-        }
-    }
-
-    /// Creates new [`ListingTableConfig`] with multiple table paths.
-    ///
-    /// See [`Self::infer_options`] for details on what happens with multiple 
paths
-    pub fn new_with_multi_paths(table_paths: Vec<ListingTableUrl>) -> Self {
-        Self {
-            table_paths,
-            ..Default::default()
-        }
-    }
-
-    /// Returns the source of the schema for this configuration
-    pub fn schema_source(&self) -> SchemaSource {
-        self.schema_source
-    }
-    /// Set the `schema` for the overall [`ListingTable`]
-    ///
-    /// [`ListingTable`] will automatically coerce, when possible, the schema
-    /// for individual files to match this schema.
-    ///
-    /// If a schema is not provided, it is inferred using
-    /// [`Self::infer_schema`].
-    ///
-    /// If the schema is provided, it must contain only the fields in the file
-    /// without the table partitioning columns.
-    ///
-    /// # Example: Specifying Table Schema
-    /// ```rust
-    /// # use std::sync::Arc;
-    /// # use datafusion::datasource::listing::{ListingTableConfig, 
ListingOptions, ListingTableUrl};
-    /// # use datafusion::datasource::file_format::parquet::ParquetFormat;
-    /// # use arrow::datatypes::{Schema, Field, DataType};
-    /// # let table_paths = 
ListingTableUrl::parse("file:///path/to/data").unwrap();
-    /// # let listing_options = 
ListingOptions::new(Arc::new(ParquetFormat::default()));
-    /// let schema = Arc::new(Schema::new(vec![
-    ///     Field::new("id", DataType::Int64, false),
-    ///     Field::new("name", DataType::Utf8, true),
-    /// ]));
-    ///
-    /// let config = ListingTableConfig::new(table_paths)
-    ///     .with_listing_options(listing_options)  // Set options first
-    ///     .with_schema(schema);                    // Then set schema
-    /// ```
-    pub fn with_schema(self, schema: SchemaRef) -> Self {
-        // Note: We preserve existing options state, but downstream code may 
expect
-        // options to be set. Consider calling with_listing_options() or 
infer_options()
-        // before operations that require options to be present.
-        debug_assert!(
-            self.options.is_some() || cfg!(test),
-            "ListingTableConfig::with_schema called without options set. \
-             Consider calling with_listing_options() or infer_options() first 
to avoid panics in downstream code."
-        );
-
-        Self {
-            file_schema: Some(schema),
-            schema_source: SchemaSource::Specified,
-            ..self
-        }
-    }
-
-    /// Add `listing_options` to [`ListingTableConfig`]
-    ///
-    /// If not provided, format and other options are inferred via
-    /// [`Self::infer_options`].
-    ///
-    /// # Example: Configuring Parquet Files with Custom Options
-    /// ```rust
-    /// # use std::sync::Arc;
-    /// # use datafusion::datasource::listing::{ListingTableConfig, 
ListingOptions, ListingTableUrl};
-    /// # use datafusion::datasource::file_format::parquet::ParquetFormat;
-    /// # let table_paths = 
ListingTableUrl::parse("file:///path/to/data").unwrap();
-    /// let options = ListingOptions::new(Arc::new(ParquetFormat::default()))
-    ///     .with_file_extension(".parquet")
-    ///     .with_collect_stat(true);
-    ///
-    /// let config = ListingTableConfig::new(table_paths)
-    ///     .with_listing_options(options);  // Configure file format and 
options
-    /// ```
-    pub fn with_listing_options(self, listing_options: ListingOptions) -> Self 
{
-        // Note: This method properly sets options, but be aware that 
downstream
-        // methods like infer_schema() and try_new() require both schema and 
options
-        // to be set to function correctly.
-        debug_assert!(
-            !self.table_paths.is_empty() || cfg!(test),
-            "ListingTableConfig::with_listing_options called without 
table_paths set. \
-             Consider calling new() or new_with_multi_paths() first to 
establish table paths."
-        );
-
-        Self {
-            options: Some(listing_options),
-            ..self
-        }
-    }
-
-    /// Returns a tuple of `(file_extension, optional compression_extension)`
-    ///
-    /// For example a path ending with blah.test.csv.gz returns `("csv", 
Some("gz"))`
-    /// For example a path ending with blah.test.csv returns `("csv", None)`
-    fn infer_file_extension_and_compression_type(
-        path: &str,
-    ) -> Result<(String, Option<String>)> {
-        let mut exts = path.rsplit('.');
-
-        let split = exts.next().unwrap_or("");
-
-        let file_compression_type = FileCompressionType::from_str(split)
-            .unwrap_or(FileCompressionType::UNCOMPRESSED);
-
-        if file_compression_type.is_compressed() {
-            let split2 = exts.next().unwrap_or("");
-            Ok((split2.to_string(), Some(split.to_string())))
-        } else {
-            Ok((split.to_string(), None))
-        }
-    }
-
+use datafusion_catalog_listing::{ListingOptions, ListingTableConfig};
+use datafusion_common::{config_datafusion_err, internal_datafusion_err};
+use datafusion_session::Session;
+use futures::StreamExt;
+use std::collections::HashMap;
+
+/// This trait exists because the following inference methods only
+/// work for [`SessionState`] implementations of [`Session`].
+/// See [`ListingTableConfig`] for the remaining inference methods.
+#[async_trait]
+pub trait ListingTableConfigExt {
     /// Infer `ListingOptions` based on `table_path` and file suffix.
     ///
     /// The format is inferred based on the first `table_path`.
-    pub async fn infer_options(self, state: &dyn Session) -> Result<Self> {
+    async fn infer_options(
+        self,
+        state: &dyn Session,
+    ) -> datafusion_common::Result<ListingTableConfig>;
+
+    /// Convenience method to call both [`Self::infer_options`] and 
[`ListingTableConfig::infer_schema`]
+    async fn infer(
+        self,
+        state: &dyn Session,
+    ) -> datafusion_common::Result<ListingTableConfig>;
+}
+
+#[async_trait]
+impl ListingTableConfigExt for ListingTableConfig {
+    async fn infer_options(
+        self,
+        state: &dyn Session,
+    ) -> datafusion_common::Result<ListingTableConfig> {

Review Comment:
   For reviewers: this is probably the only controversial change in this PR. As 
per the PR description, I split out these two methods into an extension trait 
that lives within `core` because of the use of `SessionState`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to