This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 2741c60ad9 Use compression type in CSV file suffices (#16609)
2741c60ad9 is described below
commit 2741c60ad9d637e5014d201802abb25562449491
Author: theirix <[email protected]>
AuthorDate: Mon Jul 7 12:28:20 2025 +0100
Use compression type in CSV file suffices (#16609)
* Use compression type in file suffices
- Add FileFormat::compression_type method
- Specify meaningful values for CSV only
- Use compression type as a part of extension for files
* Add CSV tests
* Add glob dep, use env logging
* Use a glob pattern with compression suffix for TableProviderFactory
* Conform to clippy standards
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
Cargo.lock | 1 +
datafusion-examples/examples/custom_file_format.rs | 4 +
datafusion-examples/examples/dataframe.rs | 1 +
datafusion/core/Cargo.toml | 1 +
.../core/src/datasource/file_format/arrow.rs | 4 +
datafusion/core/src/datasource/file_format/csv.rs | 81 ++++++++++++++
.../core/src/datasource/listing_table_factory.rs | 117 ++++++++++++++++++++-
datafusion/core/src/physical_planner.rs | 10 +-
datafusion/datasource-avro/src/file_format.rs | 4 +
datafusion/datasource-csv/src/file_format.rs | 4 +
datafusion/datasource-json/src/file_format.rs | 4 +
datafusion/datasource-parquet/src/file_format.rs | 4 +
datafusion/datasource/src/file_format.rs | 3 +
13 files changed, 234 insertions(+), 4 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 956818412c..677897516a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1872,6 +1872,7 @@ dependencies = [
"env_logger",
"flate2",
"futures",
+ "glob",
"insta",
"itertools 0.14.0",
"log",
diff --git a/datafusion-examples/examples/custom_file_format.rs
b/datafusion-examples/examples/custom_file_format.rs
index e9a4d71b16..67fe642fd4 100644
--- a/datafusion-examples/examples/custom_file_format.rs
+++ b/datafusion-examples/examples/custom_file_format.rs
@@ -81,6 +81,10 @@ impl FileFormat for TSVFileFormat {
}
}
+ fn compression_type(&self) -> Option<FileCompressionType> {
+ None
+ }
+
async fn infer_schema(
&self,
state: &dyn Session,
diff --git a/datafusion-examples/examples/dataframe.rs
b/datafusion-examples/examples/dataframe.rs
index 57a28aeca0..a5ee571a14 100644
--- a/datafusion-examples/examples/dataframe.rs
+++ b/datafusion-examples/examples/dataframe.rs
@@ -59,6 +59,7 @@ use tempfile::tempdir;
/// * [query_to_date]: execute queries against parquet files
#[tokio::main]
async fn main() -> Result<()> {
+ env_logger::init();
// The SessionContext is the main high level API for interacting with
DataFusion
let ctx = SessionContext::new();
read_parquet(&ctx).await?;
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index 9747f44240..ec7ee07d7f 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -154,6 +154,7 @@ datafusion-macros = { workspace = true }
datafusion-physical-optimizer = { workspace = true }
doc-comment = { workspace = true }
env_logger = { workspace = true }
+glob = { version = "0.3.0" }
insta = { workspace = true }
paste = "^1.0"
rand = { workspace = true, features = ["small_rng"] }
diff --git a/datafusion/core/src/datasource/file_format/arrow.rs
b/datafusion/core/src/datasource/file_format/arrow.rs
index e9bd1d2cf3..19e884601c 100644
--- a/datafusion/core/src/datasource/file_format/arrow.rs
+++ b/datafusion/core/src/datasource/file_format/arrow.rs
@@ -134,6 +134,10 @@ impl FileFormat for ArrowFormat {
}
}
+ fn compression_type(&self) -> Option<FileCompressionType> {
+ None
+ }
+
async fn infer_schema(
&self,
_state: &dyn Session,
diff --git a/datafusion/core/src/datasource/file_format/csv.rs
b/datafusion/core/src/datasource/file_format/csv.rs
index 9022e340cd..9b118b4340 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -56,6 +56,7 @@ mod tests {
use async_trait::async_trait;
use bytes::Bytes;
use chrono::DateTime;
+ use datafusion_common::parsers::CompressionTypeVariant;
use futures::stream::BoxStream;
use futures::StreamExt;
use insta::assert_snapshot;
@@ -877,6 +878,86 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn test_csv_extension_compressed() -> Result<()> {
+ // Write compressed CSV files
+ // Expect: under the directory, a file is created with ".csv.gz"
extension
+ let ctx = SessionContext::new();
+
+ let df = ctx
+ .read_csv(
+ &format!("{}/csv/aggregate_test_100.csv", arrow_test_data()),
+ CsvReadOptions::default().has_header(true),
+ )
+ .await?;
+
+ let tmp_dir = tempfile::TempDir::new().unwrap();
+ let path = format!("{}", tmp_dir.path().to_string_lossy());
+
+ let cfg1 = crate::dataframe::DataFrameWriteOptions::new();
+ let cfg2 = CsvOptions::default()
+ .with_has_header(true)
+ .with_compression(CompressionTypeVariant::GZIP);
+
+ df.write_csv(&path, cfg1, Some(cfg2)).await?;
+ assert!(std::path::Path::new(&path).exists());
+
+ let files: Vec<_> = std::fs::read_dir(&path).unwrap().collect();
+ assert_eq!(files.len(), 1);
+ assert!(files
+ .last()
+ .unwrap()
+ .as_ref()
+ .unwrap()
+ .path()
+ .file_name()
+ .unwrap()
+ .to_str()
+ .unwrap()
+ .ends_with(".csv.gz"));
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_csv_extension_uncompressed() -> Result<()> {
+ // Write plain uncompressed CSV files
+ // Expect: under the directory, a file is created with ".csv" extension
+ let ctx = SessionContext::new();
+
+ let df = ctx
+ .read_csv(
+ &format!("{}/csv/aggregate_test_100.csv", arrow_test_data()),
+ CsvReadOptions::default().has_header(true),
+ )
+ .await?;
+
+ let tmp_dir = tempfile::TempDir::new().unwrap();
+ let path = format!("{}", tmp_dir.path().to_string_lossy());
+
+ let cfg1 = crate::dataframe::DataFrameWriteOptions::new();
+ let cfg2 = CsvOptions::default().with_has_header(true);
+
+ df.write_csv(&path, cfg1, Some(cfg2)).await?;
+ assert!(std::path::Path::new(&path).exists());
+
+ let files: Vec<_> = std::fs::read_dir(&path).unwrap().collect();
+ assert_eq!(files.len(), 1);
+ assert!(files
+ .last()
+ .unwrap()
+ .as_ref()
+ .unwrap()
+ .path()
+ .file_name()
+ .unwrap()
+ .to_str()
+ .unwrap()
+ .ends_with(".csv"));
+
+ Ok(())
+ }
+
/// Read multiple empty csv files
///
/// all_empty
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs
b/datafusion/core/src/datasource/listing_table_factory.rs
index 580fa4be47..80dcdc1f34 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -128,9 +128,21 @@ impl TableProviderFactory for ListingTableFactory {
// if the folder then rewrite a file path as 'path/*.parquet'
// to only read the files the reader can understand
if table_path.is_folder() && table_path.get_glob().is_none() {
- table_path = table_path.with_glob(
- format!("*.{}", cmd.file_type.to_lowercase()).as_ref(),
- )?;
+ // Since there are no files yet to infer an actual
extension,
+ // derive the pattern based on compression type.
+ // So for gzipped CSV the pattern is `*.csv.gz`
+ let glob = match options.format.compression_type() {
+ Some(compression) => {
+ match
options.format.get_ext_with_compression(&compression) {
+ // Use glob based on `FileFormat` extension
+ Ok(ext) => format!("*.{ext}"),
+ // Fallback to `file_type`, if not supported
by `FileFormat`
+ Err(_) => format!("*.{}",
cmd.file_type.to_lowercase()),
+ }
+ }
+ None => format!("*.{}", cmd.file_type.to_lowercase()),
+ };
+ table_path = table_path.with_glob(glob.as_ref())?;
}
let schema = options.infer_schema(session_state,
&table_path).await?;
let df_schema = Arc::clone(&schema).to_dfschema()?;
@@ -175,6 +187,7 @@ fn get_extension(path: &str) -> String {
#[cfg(test)]
mod tests {
+ use glob::Pattern;
use std::collections::HashMap;
use super::*;
@@ -182,6 +195,7 @@ mod tests {
datasource::file_format::csv::CsvFormat,
execution::context::SessionContext,
};
+ use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{Constraints, DFSchema, TableReference};
#[tokio::test]
@@ -264,4 +278,101 @@ mod tests {
let listing_options = listing_table.options();
assert_eq!(".tbl", listing_options.file_extension);
}
+
+ /// Validates that CreateExternalTable with compression
+ /// searches for gzipped files in a directory location
+ #[tokio::test]
+ async fn test_create_using_folder_with_compression() {
+ let dir = tempfile::tempdir().unwrap();
+
+ let factory = ListingTableFactory::new();
+ let context = SessionContext::new();
+ let state = context.state();
+ let name = TableReference::bare("foo");
+
+ let mut options = HashMap::new();
+ options.insert("format.schema_infer_max_rec".to_owned(),
"1000".to_owned());
+ options.insert("format.has_header".into(), "true".into());
+ options.insert("format.compression".into(), "gzip".into());
+ let cmd = CreateExternalTable {
+ name,
+ location: dir.path().to_str().unwrap().to_string(),
+ file_type: "csv".to_string(),
+ schema: Arc::new(DFSchema::empty()),
+ table_partition_cols: vec![],
+ if_not_exists: false,
+ temporary: false,
+ definition: None,
+ order_exprs: vec![],
+ unbounded: false,
+ options,
+ constraints: Constraints::default(),
+ column_defaults: HashMap::new(),
+ };
+ let table_provider = factory.create(&state, &cmd).await.unwrap();
+ let listing_table = table_provider
+ .as_any()
+ .downcast_ref::<ListingTable>()
+ .unwrap();
+
+ // Verify compression is used
+ let format = listing_table.options().format.clone();
+ let csv_format = format.as_any().downcast_ref::<CsvFormat>().unwrap();
+ let csv_options = csv_format.options().clone();
+ assert_eq!(csv_options.compression, CompressionTypeVariant::GZIP);
+
+ let listing_options = listing_table.options();
+ assert_eq!("", listing_options.file_extension);
+ // Glob pattern is set to search for gzipped files
+ let table_path = listing_table.table_paths().first().unwrap();
+ assert_eq!(
+ table_path.get_glob().clone().unwrap(),
+ Pattern::new("*.csv.gz").unwrap()
+ );
+ }
+
+ /// Validates that CreateExternalTable without compression
+ /// searches for normal files in a directory location
+ #[tokio::test]
+ async fn test_create_using_folder_without_compression() {
+ let dir = tempfile::tempdir().unwrap();
+
+ let factory = ListingTableFactory::new();
+ let context = SessionContext::new();
+ let state = context.state();
+ let name = TableReference::bare("foo");
+
+ let mut options = HashMap::new();
+ options.insert("format.schema_infer_max_rec".to_owned(),
"1000".to_owned());
+ options.insert("format.has_header".into(), "true".into());
+ let cmd = CreateExternalTable {
+ name,
+ location: dir.path().to_str().unwrap().to_string(),
+ file_type: "csv".to_string(),
+ schema: Arc::new(DFSchema::empty()),
+ table_partition_cols: vec![],
+ if_not_exists: false,
+ temporary: false,
+ definition: None,
+ order_exprs: vec![],
+ unbounded: false,
+ options,
+ constraints: Constraints::default(),
+ column_defaults: HashMap::new(),
+ };
+ let table_provider = factory.create(&state, &cmd).await.unwrap();
+ let listing_table = table_provider
+ .as_any()
+ .downcast_ref::<ListingTable>()
+ .unwrap();
+
+ let listing_options = listing_table.options();
+ assert_eq!("", listing_options.file_extension);
+ // Glob pattern is set to search for gzipped files
+ let table_path = listing_table.table_paths().first().unwrap();
+ assert_eq!(
+ table_path.get_glob().clone().unwrap(),
+ Pattern::new("*.csv").unwrap()
+ );
+ }
}
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index e4bf2f5985..586a082df5 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -533,6 +533,14 @@ impl DefaultPhysicalPlanner {
let sink_format = file_type_to_format(file_type)?
.create(session_state, source_option_tuples)?;
+ // Determine extension based on format extension and
compression
+ let file_extension = match sink_format.compression_type() {
+ Some(compression_type) => sink_format
+ .get_ext_with_compression(&compression_type)
+ .unwrap_or_else(|_| sink_format.get_ext()),
+ None => sink_format.get_ext(),
+ };
+
// Set file sink related options
let config = FileSinkConfig {
original_url,
@@ -543,7 +551,7 @@ impl DefaultPhysicalPlanner {
table_partition_cols,
insert_op: InsertOp::Append,
keep_partition_by_columns,
- file_extension: sink_format.get_ext(),
+ file_extension,
};
sink_format
diff --git a/datafusion/datasource-avro/src/file_format.rs
b/datafusion/datasource-avro/src/file_format.rs
index 47f8d9daca..60c361b42e 100644
--- a/datafusion/datasource-avro/src/file_format.rs
+++ b/datafusion/datasource-avro/src/file_format.rs
@@ -110,6 +110,10 @@ impl FileFormat for AvroFormat {
}
}
+ fn compression_type(&self) -> Option<FileCompressionType> {
+ None
+ }
+
async fn infer_schema(
&self,
_state: &dyn Session,
diff --git a/datafusion/datasource-csv/src/file_format.rs
b/datafusion/datasource-csv/src/file_format.rs
index 50ad02fa4f..4eeb431584 100644
--- a/datafusion/datasource-csv/src/file_format.rs
+++ b/datafusion/datasource-csv/src/file_format.rs
@@ -358,6 +358,10 @@ impl FileFormat for CsvFormat {
Ok(format!("{}{}", ext, file_compression_type.get_ext()))
}
+ fn compression_type(&self) -> Option<FileCompressionType> {
+ Some(self.options.compression.into())
+ }
+
async fn infer_schema(
&self,
state: &dyn Session,
diff --git a/datafusion/datasource-json/src/file_format.rs
b/datafusion/datasource-json/src/file_format.rs
index f6b758b5bc..51f4bd7e96 100644
--- a/datafusion/datasource-json/src/file_format.rs
+++ b/datafusion/datasource-json/src/file_format.rs
@@ -185,6 +185,10 @@ impl FileFormat for JsonFormat {
Ok(format!("{}{}", ext, file_compression_type.get_ext()))
}
+ fn compression_type(&self) -> Option<FileCompressionType> {
+ Some(self.options.compression.into())
+ }
+
async fn infer_schema(
&self,
_state: &dyn Session,
diff --git a/datafusion/datasource-parquet/src/file_format.rs
b/datafusion/datasource-parquet/src/file_format.rs
index ed560e3336..82cf06b538 100644
--- a/datafusion/datasource-parquet/src/file_format.rs
+++ b/datafusion/datasource-parquet/src/file_format.rs
@@ -340,6 +340,10 @@ impl FileFormat for ParquetFormat {
}
}
+ fn compression_type(&self) -> Option<FileCompressionType> {
+ None
+ }
+
async fn infer_schema(
&self,
state: &dyn Session,
diff --git a/datafusion/datasource/src/file_format.rs
b/datafusion/datasource/src/file_format.rs
index b2caf5277a..e0239ab36d 100644
--- a/datafusion/datasource/src/file_format.rs
+++ b/datafusion/datasource/src/file_format.rs
@@ -61,6 +61,9 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
_file_compression_type: &FileCompressionType,
) -> Result<String>;
+ /// Returns whether this instance uses compression if applicable
+ fn compression_type(&self) -> Option<FileCompressionType>;
+
/// Infer the common schema of the provided objects. The objects will
usually
/// be analysed up to a given number of records or files (as specified in
the
/// format config) then give the estimated common schema. This might fail
if
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]