This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 4084894ebe fix: create file for empty stream (#16342) 4084894ebe is described below commit 4084894ebe1889b6ce80f3a207453154de274b03 Author: Chen Chongchen <chenkov...@qq.com> AuthorDate: Thu Jun 19 05:10:19 2025 +0800 fix: create file for empty stream (#16342) * fix: create csv for empty stream * clippy * fmt * update test --- datafusion/core/src/datasource/file_format/csv.rs | 57 ++++++++++++++++++++++ datafusion/core/src/datasource/file_format/json.rs | 56 +++++++++++++++++++++ .../core/src/datasource/file_format/parquet.rs | 55 +++++++++++++++++++++ datafusion/datasource/src/file_sink_config.rs | 37 +++++++++++--- 4 files changed, 198 insertions(+), 7 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index efec07abbc..9022e340cd 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -33,6 +33,7 @@ mod tests { use arrow_schema::{DataType, Field, Schema, SchemaRef}; use datafusion_catalog::Session; use datafusion_common::cast::as_string_array; + use datafusion_common::config::CsvOptions; use datafusion_common::internal_err; use datafusion_common::stats::Precision; use datafusion_common::test_util::{arrow_test_data, batches_to_string}; @@ -795,6 +796,62 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_csv_write_empty_file() -> Result<()> { + // Case 1. write to a single file + // Expect: an empty file created + let tmp_dir = tempfile::TempDir::new().unwrap(); + let path = format!("{}/empty.csv", tmp_dir.path().to_string_lossy()); + + let ctx = SessionContext::new(); + + let df = ctx.sql("SELECT 1 limit 0").await?; + + let cfg1 = + crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true); + let cfg2 = CsvOptions::default().with_has_header(true); + + df.write_csv(&path, cfg1, Some(cfg2)).await?; + assert!(std::path::Path::new(&path).exists()); + + // Case 2. write to a directory without partition columns + // Expect: under the directory, an empty file is created + let tmp_dir = tempfile::TempDir::new().unwrap(); + let path = format!("{}", tmp_dir.path().to_string_lossy()); + + let cfg1 = + crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true); + let cfg2 = CsvOptions::default().with_has_header(true); + + let df = ctx.sql("SELECT 1 limit 0").await?; + + df.write_csv(&path, cfg1, Some(cfg2)).await?; + assert!(std::path::Path::new(&path).exists()); + + let files = std::fs::read_dir(&path).unwrap(); + assert!(files.count() == 1); + + // Case 3. write to a directory with partition columns + // Expect: No file is created + let tmp_dir = tempfile::TempDir::new().unwrap(); + let path = format!("{}", tmp_dir.path().to_string_lossy()); + + let df = ctx.sql("SELECT 1 as col1, 2 as col2 limit 0").await?; + + let cfg1 = crate::dataframe::DataFrameWriteOptions::new() + .with_single_file_output(true) + .with_partition_by(vec!["col1".to_string()]); + let cfg2 = CsvOptions::default().with_has_header(true); + + df.write_csv(&path, cfg1, Some(cfg2)).await?; + + assert!(std::path::Path::new(&path).exists()); + let files = std::fs::read_dir(&path).unwrap(); + assert!(files.count() == 0); + + Ok(()) + } + /// Read a single empty csv file with header /// /// empty.csv: diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 34d3d64f07..d818187bb3 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -31,6 +31,7 @@ mod tests { use arrow_schema::Schema; use bytes::Bytes; use datafusion_catalog::Session; + use datafusion_common::config::JsonOptions; use datafusion_common::test_util::batches_to_string; use datafusion_datasource::decoder::{ BatchDeserializer, DecoderDeserializer, DeserializerOutput, @@ -257,6 +258,61 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_json_write_empty_file() -> Result<()> { + // Case 1. write to a single file + // Expect: an empty file created + let tmp_dir = tempfile::TempDir::new().unwrap(); + let path = format!("{}/empty.json", tmp_dir.path().to_string_lossy()); + + let ctx = SessionContext::new(); + + let df = ctx.sql("SELECT 1 limit 0").await?; + + let cfg1 = + crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true); + let cfg2 = JsonOptions::default(); + + df.write_json(&path, cfg1, Some(cfg2)).await?; + assert!(std::path::Path::new(&path).exists()); + + // Case 2. write to a directory without partition columns + // Expect: under the directory, an empty file is created + let tmp_dir = tempfile::TempDir::new().unwrap(); + let path = format!("{}", tmp_dir.path().to_string_lossy()); + + let cfg1 = + crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true); + let cfg2 = JsonOptions::default(); + + let df = ctx.sql("SELECT 1 limit 0").await?; + + df.write_json(&path, cfg1, Some(cfg2)).await?; + assert!(std::path::Path::new(&path).exists()); + + let files = std::fs::read_dir(&path).unwrap(); + assert!(files.count() == 1); + + // Case 3. write to a directory with partition columns + // Expect: No file is created + let tmp_dir = tempfile::TempDir::new().unwrap(); + let path = format!("{}", tmp_dir.path().to_string_lossy()); + + let df = ctx.sql("SELECT 1 as col1, 2 as col2 limit 0").await?; + + let cfg1 = crate::dataframe::DataFrameWriteOptions::new() + .with_single_file_output(true) + .with_partition_by(vec!["col1".to_string()]); + let cfg2 = JsonOptions::default(); + + df.write_json(&path, cfg1, Some(cfg2)).await?; + + assert!(std::path::Path::new(&path).exists()); + let files = std::fs::read_dir(&path).unwrap(); + assert!(files.count() == 0); + Ok(()) + } + #[test] fn test_json_deserializer_finish() -> Result<()> { let schema = Arc::new(Schema::new(vec![ diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 6a5c19829c..dcd20f8a26 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1246,6 +1246,61 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_parquet_write_empty_file() -> Result<()> { + // Case 1. write to a single file + // Expect: an empty file created + let tmp_dir = tempfile::TempDir::new().unwrap(); + let path = format!("{}/empty.parquet", tmp_dir.path().to_string_lossy()); + + let ctx = SessionContext::new(); + + let df = ctx.sql("SELECT 1 limit 0").await?; + + let cfg1 = + crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true); + let cfg2 = TableParquetOptions::default(); + + df.write_parquet(&path, cfg1, Some(cfg2)).await?; + assert!(std::path::Path::new(&path).exists()); + + // Case 2. write to a directory without partition columns + // Expect: under the directory, an empty file is created + let tmp_dir = tempfile::TempDir::new().unwrap(); + let path = format!("{}", tmp_dir.path().to_string_lossy()); + + let cfg1 = + crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true); + let cfg2 = TableParquetOptions::default(); + + let df = ctx.sql("SELECT 1 limit 0").await?; + + df.write_parquet(&path, cfg1, Some(cfg2)).await?; + assert!(std::path::Path::new(&path).exists()); + + let files = std::fs::read_dir(&path).unwrap(); + assert!(files.count() == 1); + + // Case 3. write to a directory with partition columns + // Expect: No file is created + let tmp_dir = tempfile::TempDir::new().unwrap(); + let path = format!("{}", tmp_dir.path().to_string_lossy()); + + let df = ctx.sql("SELECT 1 as col1, 2 as col2 limit 0").await?; + + let cfg1 = crate::dataframe::DataFrameWriteOptions::new() + .with_single_file_output(true) + .with_partition_by(vec!["col1".to_string()]); + let cfg2 = TableParquetOptions::default(); + + df.write_parquet(&path, cfg1, Some(cfg2)).await?; + + assert!(std::path::Path::new(&path).exists()); + let files = std::fs::read_dir(&path).unwrap(); + assert!(files.count() == 0); + Ok(()) + } + #[tokio::test] async fn parquet_sink_write_insert_schema_into_metadata() -> Result<()> { // expected kv metadata without schema diff --git a/datafusion/datasource/src/file_sink_config.rs b/datafusion/datasource/src/file_sink_config.rs index 2968bd1ee0..8a86b11a47 100644 --- a/datafusion/datasource/src/file_sink_config.rs +++ b/datafusion/datasource/src/file_sink_config.rs @@ -22,12 +22,14 @@ use crate::sink::DataSink; use crate::write::demux::{start_demuxer_task, DemuxedStreamReceiver}; use crate::ListingTableUrl; +use arrow::array::RecordBatch; use arrow::datatypes::{DataType, SchemaRef}; use datafusion_common::Result; use datafusion_common_runtime::SpawnedTask; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::dml::InsertOp; +use datafusion_physical_plan::stream::RecordBatchStreamAdapter; use async_trait::async_trait; use object_store::ObjectStore; @@ -77,13 +79,34 @@ pub trait FileSink: DataSink { .runtime_env() .object_store(&config.object_store_url)?; let (demux_task, file_stream_rx) = start_demuxer_task(config, data, context); - self.spawn_writer_tasks_and_join( - context, - demux_task, - file_stream_rx, - object_store, - ) - .await + let mut num_rows = self + .spawn_writer_tasks_and_join( + context, + demux_task, + file_stream_rx, + Arc::clone(&object_store), + ) + .await?; + if num_rows == 0 { + // If no rows were written, then no files are output either. + // In this case, send an empty record batch through to ensure the output file is generated + let schema = Arc::clone(&config.output_schema); + let empty_batch = RecordBatch::new_empty(Arc::clone(&schema)); + let data = Box::pin(RecordBatchStreamAdapter::new( + schema, + futures::stream::iter(vec![Ok(empty_batch)]), + )); + let (demux_task, file_stream_rx) = start_demuxer_task(config, data, context); + num_rows = self + .spawn_writer_tasks_and_join( + context, + demux_task, + file_stream_rx, + Arc::clone(&object_store), + ) + .await?; + } + Ok(num_rows) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org