This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 2741c60ad9 Use compression type in CSV file suffices (#16609)
2741c60ad9 is described below

commit 2741c60ad9d637e5014d201802abb25562449491
Author: theirix <[email protected]>
AuthorDate: Mon Jul 7 12:28:20 2025 +0100

    Use compression type in CSV file suffices (#16609)
    
    * Use compression type in file suffices
    
    - Add FileFormat::compression_type method
    - Specify meaningful values for CSV only
    - Use compression type as a part of extension for files
    
    * Add CSV tests
    
    * Add glob dep, use env logging
    
    * Use a glob pattern with compression suffix for TableProviderFactory
    
    * Conform to clippy standards
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 Cargo.lock                                         |   1 +
 datafusion-examples/examples/custom_file_format.rs |   4 +
 datafusion-examples/examples/dataframe.rs          |   1 +
 datafusion/core/Cargo.toml                         |   1 +
 .../core/src/datasource/file_format/arrow.rs       |   4 +
 datafusion/core/src/datasource/file_format/csv.rs  |  81 ++++++++++++++
 .../core/src/datasource/listing_table_factory.rs   | 117 ++++++++++++++++++++-
 datafusion/core/src/physical_planner.rs            |  10 +-
 datafusion/datasource-avro/src/file_format.rs      |   4 +
 datafusion/datasource-csv/src/file_format.rs       |   4 +
 datafusion/datasource-json/src/file_format.rs      |   4 +
 datafusion/datasource-parquet/src/file_format.rs   |   4 +
 datafusion/datasource/src/file_format.rs           |   3 +
 13 files changed, 234 insertions(+), 4 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 956818412c..677897516a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1872,6 +1872,7 @@ dependencies = [
  "env_logger",
  "flate2",
  "futures",
+ "glob",
  "insta",
  "itertools 0.14.0",
  "log",
diff --git a/datafusion-examples/examples/custom_file_format.rs 
b/datafusion-examples/examples/custom_file_format.rs
index e9a4d71b16..67fe642fd4 100644
--- a/datafusion-examples/examples/custom_file_format.rs
+++ b/datafusion-examples/examples/custom_file_format.rs
@@ -81,6 +81,10 @@ impl FileFormat for TSVFileFormat {
         }
     }
 
+    fn compression_type(&self) -> Option<FileCompressionType> {
+        None
+    }
+
     async fn infer_schema(
         &self,
         state: &dyn Session,
diff --git a/datafusion-examples/examples/dataframe.rs 
b/datafusion-examples/examples/dataframe.rs
index 57a28aeca0..a5ee571a14 100644
--- a/datafusion-examples/examples/dataframe.rs
+++ b/datafusion-examples/examples/dataframe.rs
@@ -59,6 +59,7 @@ use tempfile::tempdir;
 /// * [query_to_date]: execute queries against parquet files
 #[tokio::main]
 async fn main() -> Result<()> {
+    env_logger::init();
     // The SessionContext is the main high level API for interacting with 
DataFusion
     let ctx = SessionContext::new();
     read_parquet(&ctx).await?;
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index 9747f44240..ec7ee07d7f 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -154,6 +154,7 @@ datafusion-macros = { workspace = true }
 datafusion-physical-optimizer = { workspace = true }
 doc-comment = { workspace = true }
 env_logger = { workspace = true }
+glob = { version = "0.3.0" }
 insta = { workspace = true }
 paste = "^1.0"
 rand = { workspace = true, features = ["small_rng"] }
diff --git a/datafusion/core/src/datasource/file_format/arrow.rs 
b/datafusion/core/src/datasource/file_format/arrow.rs
index e9bd1d2cf3..19e884601c 100644
--- a/datafusion/core/src/datasource/file_format/arrow.rs
+++ b/datafusion/core/src/datasource/file_format/arrow.rs
@@ -134,6 +134,10 @@ impl FileFormat for ArrowFormat {
         }
     }
 
+    fn compression_type(&self) -> Option<FileCompressionType> {
+        None
+    }
+
     async fn infer_schema(
         &self,
         _state: &dyn Session,
diff --git a/datafusion/core/src/datasource/file_format/csv.rs 
b/datafusion/core/src/datasource/file_format/csv.rs
index 9022e340cd..9b118b4340 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -56,6 +56,7 @@ mod tests {
     use async_trait::async_trait;
     use bytes::Bytes;
     use chrono::DateTime;
+    use datafusion_common::parsers::CompressionTypeVariant;
     use futures::stream::BoxStream;
     use futures::StreamExt;
     use insta::assert_snapshot;
@@ -877,6 +878,86 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_csv_extension_compressed() -> Result<()> {
+        // Write compressed CSV files
+        // Expect: under the directory, a file is created with ".csv.gz" 
extension
+        let ctx = SessionContext::new();
+
+        let df = ctx
+            .read_csv(
+                &format!("{}/csv/aggregate_test_100.csv", arrow_test_data()),
+                CsvReadOptions::default().has_header(true),
+            )
+            .await?;
+
+        let tmp_dir = tempfile::TempDir::new().unwrap();
+        let path = format!("{}", tmp_dir.path().to_string_lossy());
+
+        let cfg1 = crate::dataframe::DataFrameWriteOptions::new();
+        let cfg2 = CsvOptions::default()
+            .with_has_header(true)
+            .with_compression(CompressionTypeVariant::GZIP);
+
+        df.write_csv(&path, cfg1, Some(cfg2)).await?;
+        assert!(std::path::Path::new(&path).exists());
+
+        let files: Vec<_> = std::fs::read_dir(&path).unwrap().collect();
+        assert_eq!(files.len(), 1);
+        assert!(files
+            .last()
+            .unwrap()
+            .as_ref()
+            .unwrap()
+            .path()
+            .file_name()
+            .unwrap()
+            .to_str()
+            .unwrap()
+            .ends_with(".csv.gz"));
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_csv_extension_uncompressed() -> Result<()> {
+        // Write plain uncompressed CSV files
+        // Expect: under the directory, a file is created with ".csv" extension
+        let ctx = SessionContext::new();
+
+        let df = ctx
+            .read_csv(
+                &format!("{}/csv/aggregate_test_100.csv", arrow_test_data()),
+                CsvReadOptions::default().has_header(true),
+            )
+            .await?;
+
+        let tmp_dir = tempfile::TempDir::new().unwrap();
+        let path = format!("{}", tmp_dir.path().to_string_lossy());
+
+        let cfg1 = crate::dataframe::DataFrameWriteOptions::new();
+        let cfg2 = CsvOptions::default().with_has_header(true);
+
+        df.write_csv(&path, cfg1, Some(cfg2)).await?;
+        assert!(std::path::Path::new(&path).exists());
+
+        let files: Vec<_> = std::fs::read_dir(&path).unwrap().collect();
+        assert_eq!(files.len(), 1);
+        assert!(files
+            .last()
+            .unwrap()
+            .as_ref()
+            .unwrap()
+            .path()
+            .file_name()
+            .unwrap()
+            .to_str()
+            .unwrap()
+            .ends_with(".csv"));
+
+        Ok(())
+    }
+
     /// Read multiple empty csv files
     ///
     /// all_empty
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs 
b/datafusion/core/src/datasource/listing_table_factory.rs
index 580fa4be47..80dcdc1f34 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -128,9 +128,21 @@ impl TableProviderFactory for ListingTableFactory {
                 // if the folder then rewrite a file path as 'path/*.parquet'
                 // to only read the files the reader can understand
                 if table_path.is_folder() && table_path.get_glob().is_none() {
-                    table_path = table_path.with_glob(
-                        format!("*.{}", cmd.file_type.to_lowercase()).as_ref(),
-                    )?;
+                    // Since there are no files yet to infer an actual 
extension,
+                    // derive the pattern based on compression type.
+                    // So for gzipped CSV the pattern is `*.csv.gz`
+                    let glob = match options.format.compression_type() {
+                        Some(compression) => {
+                            match 
options.format.get_ext_with_compression(&compression) {
+                                // Use glob based on `FileFormat` extension
+                                Ok(ext) => format!("*.{ext}"),
+                                // Fallback to `file_type`, if not supported 
by `FileFormat`
+                                Err(_) => format!("*.{}", 
cmd.file_type.to_lowercase()),
+                            }
+                        }
+                        None => format!("*.{}", cmd.file_type.to_lowercase()),
+                    };
+                    table_path = table_path.with_glob(glob.as_ref())?;
                 }
                 let schema = options.infer_schema(session_state, 
&table_path).await?;
                 let df_schema = Arc::clone(&schema).to_dfschema()?;
@@ -175,6 +187,7 @@ fn get_extension(path: &str) -> String {
 
 #[cfg(test)]
 mod tests {
+    use glob::Pattern;
     use std::collections::HashMap;
 
     use super::*;
@@ -182,6 +195,7 @@ mod tests {
         datasource::file_format::csv::CsvFormat, 
execution::context::SessionContext,
     };
 
+    use datafusion_common::parsers::CompressionTypeVariant;
     use datafusion_common::{Constraints, DFSchema, TableReference};
 
     #[tokio::test]
@@ -264,4 +278,101 @@ mod tests {
         let listing_options = listing_table.options();
         assert_eq!(".tbl", listing_options.file_extension);
     }
+
+    /// Validates that CreateExternalTable with compression
+    /// searches for gzipped files in a directory location
+    #[tokio::test]
+    async fn test_create_using_folder_with_compression() {
+        let dir = tempfile::tempdir().unwrap();
+
+        let factory = ListingTableFactory::new();
+        let context = SessionContext::new();
+        let state = context.state();
+        let name = TableReference::bare("foo");
+
+        let mut options = HashMap::new();
+        options.insert("format.schema_infer_max_rec".to_owned(), 
"1000".to_owned());
+        options.insert("format.has_header".into(), "true".into());
+        options.insert("format.compression".into(), "gzip".into());
+        let cmd = CreateExternalTable {
+            name,
+            location: dir.path().to_str().unwrap().to_string(),
+            file_type: "csv".to_string(),
+            schema: Arc::new(DFSchema::empty()),
+            table_partition_cols: vec![],
+            if_not_exists: false,
+            temporary: false,
+            definition: None,
+            order_exprs: vec![],
+            unbounded: false,
+            options,
+            constraints: Constraints::default(),
+            column_defaults: HashMap::new(),
+        };
+        let table_provider = factory.create(&state, &cmd).await.unwrap();
+        let listing_table = table_provider
+            .as_any()
+            .downcast_ref::<ListingTable>()
+            .unwrap();
+
+        // Verify compression is used
+        let format = listing_table.options().format.clone();
+        let csv_format = format.as_any().downcast_ref::<CsvFormat>().unwrap();
+        let csv_options = csv_format.options().clone();
+        assert_eq!(csv_options.compression, CompressionTypeVariant::GZIP);
+
+        let listing_options = listing_table.options();
+        assert_eq!("", listing_options.file_extension);
+        // Glob pattern is set to search for gzipped files
+        let table_path = listing_table.table_paths().first().unwrap();
+        assert_eq!(
+            table_path.get_glob().clone().unwrap(),
+            Pattern::new("*.csv.gz").unwrap()
+        );
+    }
+
+    /// Validates that CreateExternalTable without compression
+    /// searches for normal files in a directory location
+    #[tokio::test]
+    async fn test_create_using_folder_without_compression() {
+        let dir = tempfile::tempdir().unwrap();
+
+        let factory = ListingTableFactory::new();
+        let context = SessionContext::new();
+        let state = context.state();
+        let name = TableReference::bare("foo");
+
+        let mut options = HashMap::new();
+        options.insert("format.schema_infer_max_rec".to_owned(), 
"1000".to_owned());
+        options.insert("format.has_header".into(), "true".into());
+        let cmd = CreateExternalTable {
+            name,
+            location: dir.path().to_str().unwrap().to_string(),
+            file_type: "csv".to_string(),
+            schema: Arc::new(DFSchema::empty()),
+            table_partition_cols: vec![],
+            if_not_exists: false,
+            temporary: false,
+            definition: None,
+            order_exprs: vec![],
+            unbounded: false,
+            options,
+            constraints: Constraints::default(),
+            column_defaults: HashMap::new(),
+        };
+        let table_provider = factory.create(&state, &cmd).await.unwrap();
+        let listing_table = table_provider
+            .as_any()
+            .downcast_ref::<ListingTable>()
+            .unwrap();
+
+        let listing_options = listing_table.options();
+        assert_eq!("", listing_options.file_extension);
+        // Glob pattern is set to search for gzipped files
+        let table_path = listing_table.table_paths().first().unwrap();
+        assert_eq!(
+            table_path.get_glob().clone().unwrap(),
+            Pattern::new("*.csv").unwrap()
+        );
+    }
 }
diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index e4bf2f5985..586a082df5 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -533,6 +533,14 @@ impl DefaultPhysicalPlanner {
                 let sink_format = file_type_to_format(file_type)?
                     .create(session_state, source_option_tuples)?;
 
+                // Determine extension based on format extension and 
compression
+                let file_extension = match sink_format.compression_type() {
+                    Some(compression_type) => sink_format
+                        .get_ext_with_compression(&compression_type)
+                        .unwrap_or_else(|_| sink_format.get_ext()),
+                    None => sink_format.get_ext(),
+                };
+
                 // Set file sink related options
                 let config = FileSinkConfig {
                     original_url,
@@ -543,7 +551,7 @@ impl DefaultPhysicalPlanner {
                     table_partition_cols,
                     insert_op: InsertOp::Append,
                     keep_partition_by_columns,
-                    file_extension: sink_format.get_ext(),
+                    file_extension,
                 };
 
                 sink_format
diff --git a/datafusion/datasource-avro/src/file_format.rs 
b/datafusion/datasource-avro/src/file_format.rs
index 47f8d9daca..60c361b42e 100644
--- a/datafusion/datasource-avro/src/file_format.rs
+++ b/datafusion/datasource-avro/src/file_format.rs
@@ -110,6 +110,10 @@ impl FileFormat for AvroFormat {
         }
     }
 
+    fn compression_type(&self) -> Option<FileCompressionType> {
+        None
+    }
+
     async fn infer_schema(
         &self,
         _state: &dyn Session,
diff --git a/datafusion/datasource-csv/src/file_format.rs 
b/datafusion/datasource-csv/src/file_format.rs
index 50ad02fa4f..4eeb431584 100644
--- a/datafusion/datasource-csv/src/file_format.rs
+++ b/datafusion/datasource-csv/src/file_format.rs
@@ -358,6 +358,10 @@ impl FileFormat for CsvFormat {
         Ok(format!("{}{}", ext, file_compression_type.get_ext()))
     }
 
+    fn compression_type(&self) -> Option<FileCompressionType> {
+        Some(self.options.compression.into())
+    }
+
     async fn infer_schema(
         &self,
         state: &dyn Session,
diff --git a/datafusion/datasource-json/src/file_format.rs 
b/datafusion/datasource-json/src/file_format.rs
index f6b758b5bc..51f4bd7e96 100644
--- a/datafusion/datasource-json/src/file_format.rs
+++ b/datafusion/datasource-json/src/file_format.rs
@@ -185,6 +185,10 @@ impl FileFormat for JsonFormat {
         Ok(format!("{}{}", ext, file_compression_type.get_ext()))
     }
 
+    fn compression_type(&self) -> Option<FileCompressionType> {
+        Some(self.options.compression.into())
+    }
+
     async fn infer_schema(
         &self,
         _state: &dyn Session,
diff --git a/datafusion/datasource-parquet/src/file_format.rs 
b/datafusion/datasource-parquet/src/file_format.rs
index ed560e3336..82cf06b538 100644
--- a/datafusion/datasource-parquet/src/file_format.rs
+++ b/datafusion/datasource-parquet/src/file_format.rs
@@ -340,6 +340,10 @@ impl FileFormat for ParquetFormat {
         }
     }
 
+    fn compression_type(&self) -> Option<FileCompressionType> {
+        None
+    }
+
     async fn infer_schema(
         &self,
         state: &dyn Session,
diff --git a/datafusion/datasource/src/file_format.rs 
b/datafusion/datasource/src/file_format.rs
index b2caf5277a..e0239ab36d 100644
--- a/datafusion/datasource/src/file_format.rs
+++ b/datafusion/datasource/src/file_format.rs
@@ -61,6 +61,9 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
         _file_compression_type: &FileCompressionType,
     ) -> Result<String>;
 
+    /// Returns whether this instance uses compression if applicable
+    fn compression_type(&self) -> Option<FileCompressionType>;
+
     /// Infer the common schema of the provided objects. The objects will 
usually
     /// be analysed up to a given number of records or files (as specified in 
the
     /// format config) then give the estimated common schema. This might fail 
if


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to