This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 4084894ebe fix: create file for empty stream (#16342)
4084894ebe is described below
commit 4084894ebe1889b6ce80f3a207453154de274b03
Author: Chen Chongchen <[email protected]>
AuthorDate: Thu Jun 19 05:10:19 2025 +0800
fix: create file for empty stream (#16342)
* fix: create csv for empty stream
* clippy
* fmt
* update test
---
datafusion/core/src/datasource/file_format/csv.rs | 57 ++++++++++++++++++++++
datafusion/core/src/datasource/file_format/json.rs | 56 +++++++++++++++++++++
.../core/src/datasource/file_format/parquet.rs | 55 +++++++++++++++++++++
datafusion/datasource/src/file_sink_config.rs | 37 +++++++++++---
4 files changed, 198 insertions(+), 7 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/csv.rs
b/datafusion/core/src/datasource/file_format/csv.rs
index efec07abbc..9022e340cd 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -33,6 +33,7 @@ mod tests {
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use datafusion_catalog::Session;
use datafusion_common::cast::as_string_array;
+ use datafusion_common::config::CsvOptions;
use datafusion_common::internal_err;
use datafusion_common::stats::Precision;
use datafusion_common::test_util::{arrow_test_data, batches_to_string};
@@ -795,6 +796,62 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn test_csv_write_empty_file() -> Result<()> {
+ // Case 1. write to a single file
+ // Expect: an empty file created
+ let tmp_dir = tempfile::TempDir::new().unwrap();
+ let path = format!("{}/empty.csv", tmp_dir.path().to_string_lossy());
+
+ let ctx = SessionContext::new();
+
+ let df = ctx.sql("SELECT 1 limit 0").await?;
+
+ let cfg1 =
+
crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true);
+ let cfg2 = CsvOptions::default().with_has_header(true);
+
+ df.write_csv(&path, cfg1, Some(cfg2)).await?;
+ assert!(std::path::Path::new(&path).exists());
+
+ // Case 2. write to a directory without partition columns
+ // Expect: under the directory, an empty file is created
+ let tmp_dir = tempfile::TempDir::new().unwrap();
+ let path = format!("{}", tmp_dir.path().to_string_lossy());
+
+ let cfg1 =
+
crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true);
+ let cfg2 = CsvOptions::default().with_has_header(true);
+
+ let df = ctx.sql("SELECT 1 limit 0").await?;
+
+ df.write_csv(&path, cfg1, Some(cfg2)).await?;
+ assert!(std::path::Path::new(&path).exists());
+
+ let files = std::fs::read_dir(&path).unwrap();
+ assert!(files.count() == 1);
+
+ // Case 3. write to a directory with partition columns
+ // Expect: No file is created
+ let tmp_dir = tempfile::TempDir::new().unwrap();
+ let path = format!("{}", tmp_dir.path().to_string_lossy());
+
+ let df = ctx.sql("SELECT 1 as col1, 2 as col2 limit 0").await?;
+
+ let cfg1 = crate::dataframe::DataFrameWriteOptions::new()
+ .with_single_file_output(true)
+ .with_partition_by(vec!["col1".to_string()]);
+ let cfg2 = CsvOptions::default().with_has_header(true);
+
+ df.write_csv(&path, cfg1, Some(cfg2)).await?;
+
+ assert!(std::path::Path::new(&path).exists());
+ let files = std::fs::read_dir(&path).unwrap();
+ assert!(files.count() == 0);
+
+ Ok(())
+ }
+
/// Read a single empty csv file with header
///
/// empty.csv:
diff --git a/datafusion/core/src/datasource/file_format/json.rs
b/datafusion/core/src/datasource/file_format/json.rs
index 34d3d64f07..d818187bb3 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -31,6 +31,7 @@ mod tests {
use arrow_schema::Schema;
use bytes::Bytes;
use datafusion_catalog::Session;
+ use datafusion_common::config::JsonOptions;
use datafusion_common::test_util::batches_to_string;
use datafusion_datasource::decoder::{
BatchDeserializer, DecoderDeserializer, DeserializerOutput,
@@ -257,6 +258,61 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn test_json_write_empty_file() -> Result<()> {
+ // Case 1. write to a single file
+ // Expect: an empty file created
+ let tmp_dir = tempfile::TempDir::new().unwrap();
+ let path = format!("{}/empty.json", tmp_dir.path().to_string_lossy());
+
+ let ctx = SessionContext::new();
+
+ let df = ctx.sql("SELECT 1 limit 0").await?;
+
+ let cfg1 =
+
crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true);
+ let cfg2 = JsonOptions::default();
+
+ df.write_json(&path, cfg1, Some(cfg2)).await?;
+ assert!(std::path::Path::new(&path).exists());
+
+ // Case 2. write to a directory without partition columns
+ // Expect: under the directory, an empty file is created
+ let tmp_dir = tempfile::TempDir::new().unwrap();
+ let path = format!("{}", tmp_dir.path().to_string_lossy());
+
+ let cfg1 =
+
crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true);
+ let cfg2 = JsonOptions::default();
+
+ let df = ctx.sql("SELECT 1 limit 0").await?;
+
+ df.write_json(&path, cfg1, Some(cfg2)).await?;
+ assert!(std::path::Path::new(&path).exists());
+
+ let files = std::fs::read_dir(&path).unwrap();
+ assert!(files.count() == 1);
+
+ // Case 3. write to a directory with partition columns
+ // Expect: No file is created
+ let tmp_dir = tempfile::TempDir::new().unwrap();
+ let path = format!("{}", tmp_dir.path().to_string_lossy());
+
+ let df = ctx.sql("SELECT 1 as col1, 2 as col2 limit 0").await?;
+
+ let cfg1 = crate::dataframe::DataFrameWriteOptions::new()
+ .with_single_file_output(true)
+ .with_partition_by(vec!["col1".to_string()]);
+ let cfg2 = JsonOptions::default();
+
+ df.write_json(&path, cfg1, Some(cfg2)).await?;
+
+ assert!(std::path::Path::new(&path).exists());
+ let files = std::fs::read_dir(&path).unwrap();
+ assert!(files.count() == 0);
+ Ok(())
+ }
+
#[test]
fn test_json_deserializer_finish() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs
b/datafusion/core/src/datasource/file_format/parquet.rs
index 6a5c19829c..dcd20f8a26 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -1246,6 +1246,61 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn test_parquet_write_empty_file() -> Result<()> {
+ // Case 1. write to a single file
+ // Expect: an empty file created
+ let tmp_dir = tempfile::TempDir::new().unwrap();
+ let path = format!("{}/empty.parquet",
tmp_dir.path().to_string_lossy());
+
+ let ctx = SessionContext::new();
+
+ let df = ctx.sql("SELECT 1 limit 0").await?;
+
+ let cfg1 =
+
crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true);
+ let cfg2 = TableParquetOptions::default();
+
+ df.write_parquet(&path, cfg1, Some(cfg2)).await?;
+ assert!(std::path::Path::new(&path).exists());
+
+ // Case 2. write to a directory without partition columns
+ // Expect: under the directory, an empty file is created
+ let tmp_dir = tempfile::TempDir::new().unwrap();
+ let path = format!("{}", tmp_dir.path().to_string_lossy());
+
+ let cfg1 =
+
crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true);
+ let cfg2 = TableParquetOptions::default();
+
+ let df = ctx.sql("SELECT 1 limit 0").await?;
+
+ df.write_parquet(&path, cfg1, Some(cfg2)).await?;
+ assert!(std::path::Path::new(&path).exists());
+
+ let files = std::fs::read_dir(&path).unwrap();
+ assert!(files.count() == 1);
+
+ // Case 3. write to a directory with partition columns
+ // Expect: No file is created
+ let tmp_dir = tempfile::TempDir::new().unwrap();
+ let path = format!("{}", tmp_dir.path().to_string_lossy());
+
+ let df = ctx.sql("SELECT 1 as col1, 2 as col2 limit 0").await?;
+
+ let cfg1 = crate::dataframe::DataFrameWriteOptions::new()
+ .with_single_file_output(true)
+ .with_partition_by(vec!["col1".to_string()]);
+ let cfg2 = TableParquetOptions::default();
+
+ df.write_parquet(&path, cfg1, Some(cfg2)).await?;
+
+ assert!(std::path::Path::new(&path).exists());
+ let files = std::fs::read_dir(&path).unwrap();
+ assert!(files.count() == 0);
+ Ok(())
+ }
+
#[tokio::test]
async fn parquet_sink_write_insert_schema_into_metadata() -> Result<()> {
// expected kv metadata without schema
diff --git a/datafusion/datasource/src/file_sink_config.rs
b/datafusion/datasource/src/file_sink_config.rs
index 2968bd1ee0..8a86b11a47 100644
--- a/datafusion/datasource/src/file_sink_config.rs
+++ b/datafusion/datasource/src/file_sink_config.rs
@@ -22,12 +22,14 @@ use crate::sink::DataSink;
use crate::write::demux::{start_demuxer_task, DemuxedStreamReceiver};
use crate::ListingTableUrl;
+use arrow::array::RecordBatch;
use arrow::datatypes::{DataType, SchemaRef};
use datafusion_common::Result;
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::dml::InsertOp;
+use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
use async_trait::async_trait;
use object_store::ObjectStore;
@@ -77,13 +79,34 @@ pub trait FileSink: DataSink {
.runtime_env()
.object_store(&config.object_store_url)?;
let (demux_task, file_stream_rx) = start_demuxer_task(config, data,
context);
- self.spawn_writer_tasks_and_join(
- context,
- demux_task,
- file_stream_rx,
- object_store,
- )
- .await
+ let mut num_rows = self
+ .spawn_writer_tasks_and_join(
+ context,
+ demux_task,
+ file_stream_rx,
+ Arc::clone(&object_store),
+ )
+ .await?;
+ if num_rows == 0 {
+ // If no rows were written, then no files are output either.
+ // In this case, send an empty record batch through to ensure the
output file is generated
+ let schema = Arc::clone(&config.output_schema);
+ let empty_batch = RecordBatch::new_empty(Arc::clone(&schema));
+ let data = Box::pin(RecordBatchStreamAdapter::new(
+ schema,
+ futures::stream::iter(vec![Ok(empty_batch)]),
+ ));
+ let (demux_task, file_stream_rx) = start_demuxer_task(config,
data, context);
+ num_rows = self
+ .spawn_writer_tasks_and_join(
+ context,
+ demux_task,
+ file_stream_rx,
+ Arc::clone(&object_store),
+ )
+ .await?;
+ }
+ Ok(num_rows)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]