timsaucer commented on code in PR #18080:
URL: https://github.com/apache/datafusion/pull/18080#discussion_r2434145879
##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -1638,15 +191,18 @@ mod tests {
let ctx = SessionContext::new();
let testdata = datafusion_test_data();
let filename = format!("{testdata}/aggregate_simple.csv");
- let table_path = ListingTableUrl::parse(filename).unwrap();
+ let table_path = ListingTableUrl::parse(filename)?;
// Test default schema source
let config = ListingTableConfig::new(table_path.clone());
assert_eq!(config.schema_source(), SchemaSource::Unset);
// Test schema source after setting a schema explicitly
let provided_schema = create_test_schema();
- let config_with_schema =
config.clone().with_schema(provided_schema.clone());
+ let config_with_schema = config
+ .clone()
+ .with_listing_options(mock_listing_config_options())
Review Comment:
Changed in latest commit.
##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -15,226 +15,40 @@
// specific language governing permissions and limitations
// under the License.
-//! The table implementation.
-
-use super::{
- helpers::{expr_applicable_for_cols, pruned_partition_list},
- ListingTableUrl, PartitionedFile,
-};
-use crate::{
- datasource::file_format::{file_compression_type::FileCompressionType,
FileFormat},
- datasource::physical_plan::FileSinkConfig,
- execution::context::SessionState,
-};
-use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
-use arrow_schema::Schema;
+use crate::execution::SessionState;
use async_trait::async_trait;
-use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider};
-use datafusion_common::{
- config_datafusion_err, config_err, internal_datafusion_err, internal_err,
plan_err,
- project_schema, stats::Precision, Constraints, DataFusionError, Result,
SchemaExt,
-};
-use datafusion_datasource::{
- compute_all_files_statistics,
- file::FileSource,
- file_groups::FileGroup,
- file_scan_config::{FileScanConfig, FileScanConfigBuilder},
- schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapter,
SchemaAdapterFactory},
-};
-use datafusion_execution::{
- cache::{cache_manager::FileStatisticsCache,
cache_unit::DefaultFileStatisticsCache},
- config::SessionConfig,
-};
-use datafusion_expr::execution_props::ExecutionProps;
-use datafusion_expr::{
- dml::InsertOp, Expr, SortExpr, TableProviderFilterPushDown, TableType,
-};
-use datafusion_physical_expr::create_lex_ordering;
-use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
-use datafusion_physical_expr_common::sort_expr::LexOrdering;
-use datafusion_physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics};
-use futures::{future, stream, Stream, StreamExt, TryStreamExt};
-use itertools::Itertools;
-use object_store::ObjectStore;
-use std::{any::Any, collections::HashMap, str::FromStr, sync::Arc};
-
-/// Indicates the source of the schema for a [`ListingTable`]
-// PartialEq required for assert_eq! in tests
-#[derive(Debug, Clone, Copy, PartialEq, Default)]
-pub enum SchemaSource {
- /// Schema is not yet set (initial state)
- #[default]
- Unset,
- /// Schema was inferred from first table_path
- Inferred,
- /// Schema was specified explicitly via with_schema
- Specified,
-}
-
-/// Configuration for creating a [`ListingTable`]
-///
-/// # Schema Evolution Support
-///
-/// This configuration supports schema evolution through the optional
-/// [`SchemaAdapterFactory`]. You might want to override the default factory
when you need:
-///
-/// - **Type coercion requirements**: When you need custom logic for
converting between
-/// different Arrow data types (e.g., Int32 ↔ Int64, Utf8 ↔ LargeUtf8)
-/// - **Column mapping**: You need to map columns with a legacy name to a new
name
-/// - **Custom handling of missing columns**: By default they are filled in
with nulls, but you may e.g. want to fill them in with `0` or `""`.
-///
-/// If not specified, a [`DefaultSchemaAdapterFactory`] will be used, which
handles
-/// basic schema compatibility cases.
-///
-#[derive(Debug, Clone, Default)]
-pub struct ListingTableConfig {
- /// Paths on the `ObjectStore` for creating `ListingTable`.
- /// They should share the same schema and object store.
- pub table_paths: Vec<ListingTableUrl>,
- /// Optional `SchemaRef` for the to be created `ListingTable`.
- ///
- /// See details on [`ListingTableConfig::with_schema`]
- pub file_schema: Option<SchemaRef>,
- /// Optional [`ListingOptions`] for the to be created [`ListingTable`].
- ///
- /// See details on [`ListingTableConfig::with_listing_options`]
- pub options: Option<ListingOptions>,
- /// Tracks the source of the schema information
- schema_source: SchemaSource,
- /// Optional [`SchemaAdapterFactory`] for creating schema adapters
- schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
- /// Optional [`PhysicalExprAdapterFactory`] for creating physical
expression adapters
- expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
-}
-
-impl ListingTableConfig {
- /// Creates new [`ListingTableConfig`] for reading the specified URL
- pub fn new(table_path: ListingTableUrl) -> Self {
- Self {
- table_paths: vec![table_path],
- ..Default::default()
- }
- }
-
- /// Creates new [`ListingTableConfig`] with multiple table paths.
- ///
- /// See [`Self::infer_options`] for details on what happens with multiple
paths
- pub fn new_with_multi_paths(table_paths: Vec<ListingTableUrl>) -> Self {
- Self {
- table_paths,
- ..Default::default()
- }
- }
-
- /// Returns the source of the schema for this configuration
- pub fn schema_source(&self) -> SchemaSource {
- self.schema_source
- }
- /// Set the `schema` for the overall [`ListingTable`]
- ///
- /// [`ListingTable`] will automatically coerce, when possible, the schema
- /// for individual files to match this schema.
- ///
- /// If a schema is not provided, it is inferred using
- /// [`Self::infer_schema`].
- ///
- /// If the schema is provided, it must contain only the fields in the file
- /// without the table partitioning columns.
- ///
- /// # Example: Specifying Table Schema
- /// ```rust
- /// # use std::sync::Arc;
- /// # use datafusion::datasource::listing::{ListingTableConfig,
ListingOptions, ListingTableUrl};
- /// # use datafusion::datasource::file_format::parquet::ParquetFormat;
- /// # use arrow::datatypes::{Schema, Field, DataType};
- /// # let table_paths =
ListingTableUrl::parse("file:///path/to/data").unwrap();
- /// # let listing_options =
ListingOptions::new(Arc::new(ParquetFormat::default()));
- /// let schema = Arc::new(Schema::new(vec![
- /// Field::new("id", DataType::Int64, false),
- /// Field::new("name", DataType::Utf8, true),
- /// ]));
- ///
- /// let config = ListingTableConfig::new(table_paths)
- /// .with_listing_options(listing_options) // Set options first
- /// .with_schema(schema); // Then set schema
- /// ```
- pub fn with_schema(self, schema: SchemaRef) -> Self {
- // Note: We preserve existing options state, but downstream code may
expect
- // options to be set. Consider calling with_listing_options() or
infer_options()
- // before operations that require options to be present.
- debug_assert!(
- self.options.is_some() || cfg!(test),
- "ListingTableConfig::with_schema called without options set. \
- Consider calling with_listing_options() or infer_options() first
to avoid panics in downstream code."
- );
-
- Self {
- file_schema: Some(schema),
- schema_source: SchemaSource::Specified,
- ..self
- }
- }
-
- /// Add `listing_options` to [`ListingTableConfig`]
- ///
- /// If not provided, format and other options are inferred via
- /// [`Self::infer_options`].
- ///
- /// # Example: Configuring Parquet Files with Custom Options
- /// ```rust
- /// # use std::sync::Arc;
- /// # use datafusion::datasource::listing::{ListingTableConfig,
ListingOptions, ListingTableUrl};
- /// # use datafusion::datasource::file_format::parquet::ParquetFormat;
- /// # let table_paths =
ListingTableUrl::parse("file:///path/to/data").unwrap();
- /// let options = ListingOptions::new(Arc::new(ParquetFormat::default()))
- /// .with_file_extension(".parquet")
- /// .with_collect_stat(true);
- ///
- /// let config = ListingTableConfig::new(table_paths)
- /// .with_listing_options(options); // Configure file format and
options
- /// ```
- pub fn with_listing_options(self, listing_options: ListingOptions) -> Self
{
- // Note: This method properly sets options, but be aware that
downstream
- // methods like infer_schema() and try_new() require both schema and
options
- // to be set to function correctly.
- debug_assert!(
- !self.table_paths.is_empty() || cfg!(test),
- "ListingTableConfig::with_listing_options called without
table_paths set. \
- Consider calling new() or new_with_multi_paths() first to
establish table paths."
- );
-
- Self {
- options: Some(listing_options),
- ..self
- }
- }
-
- /// Returns a tuple of `(file_extension, optional compression_extension)`
- ///
- /// For example a path ending with blah.test.csv.gz returns `("csv",
Some("gz"))`
- /// For example a path ending with blah.test.csv returns `("csv", None)`
- fn infer_file_extension_and_compression_type(
- path: &str,
- ) -> Result<(String, Option<String>)> {
- let mut exts = path.rsplit('.');
-
- let split = exts.next().unwrap_or("");
-
- let file_compression_type = FileCompressionType::from_str(split)
- .unwrap_or(FileCompressionType::UNCOMPRESSED);
-
- if file_compression_type.is_compressed() {
- let split2 = exts.next().unwrap_or("");
- Ok((split2.to_string(), Some(split.to_string())))
- } else {
- Ok((split.to_string(), None))
- }
- }
-
+use datafusion_catalog_listing::{ListingOptions, ListingTableConfig};
+use datafusion_common::{config_datafusion_err, internal_datafusion_err};
+use datafusion_session::Session;
+use futures::StreamExt;
+use std::collections::HashMap;
+
+/// This trait exists because the following inference methods only
Review Comment:
Changed in latest commit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]