This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 4084894ebe fix: create file for empty stream (#16342)
4084894ebe is described below

commit 4084894ebe1889b6ce80f3a207453154de274b03
Author: Chen Chongchen <chenkov...@qq.com>
AuthorDate: Thu Jun 19 05:10:19 2025 +0800

    fix: create file for empty stream (#16342)
    
    * fix: create csv for empty stream
    
    * clippy
    
    * fmt
    
    * update test
---
 datafusion/core/src/datasource/file_format/csv.rs  | 57 ++++++++++++++++++++++
 datafusion/core/src/datasource/file_format/json.rs | 56 +++++++++++++++++++++
 .../core/src/datasource/file_format/parquet.rs     | 55 +++++++++++++++++++++
 datafusion/datasource/src/file_sink_config.rs      | 37 +++++++++++---
 4 files changed, 198 insertions(+), 7 deletions(-)

diff --git a/datafusion/core/src/datasource/file_format/csv.rs 
b/datafusion/core/src/datasource/file_format/csv.rs
index efec07abbc..9022e340cd 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -33,6 +33,7 @@ mod tests {
     use arrow_schema::{DataType, Field, Schema, SchemaRef};
     use datafusion_catalog::Session;
     use datafusion_common::cast::as_string_array;
+    use datafusion_common::config::CsvOptions;
     use datafusion_common::internal_err;
     use datafusion_common::stats::Precision;
     use datafusion_common::test_util::{arrow_test_data, batches_to_string};
@@ -795,6 +796,62 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_csv_write_empty_file() -> Result<()> {
+        // Case 1. write to a single file
+        // Expect: an empty file created
+        let tmp_dir = tempfile::TempDir::new().unwrap();
+        let path = format!("{}/empty.csv", tmp_dir.path().to_string_lossy());
+
+        let ctx = SessionContext::new();
+
+        let df = ctx.sql("SELECT 1 limit 0").await?;
+
+        let cfg1 =
+            
crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true);
+        let cfg2 = CsvOptions::default().with_has_header(true);
+
+        df.write_csv(&path, cfg1, Some(cfg2)).await?;
+        assert!(std::path::Path::new(&path).exists());
+
+        // Case 2. write to a directory without partition columns
+        // Expect: under the directory, an empty file is created
+        let tmp_dir = tempfile::TempDir::new().unwrap();
+        let path = format!("{}", tmp_dir.path().to_string_lossy());
+
+        let cfg1 =
+            
crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true);
+        let cfg2 = CsvOptions::default().with_has_header(true);
+
+        let df = ctx.sql("SELECT 1 limit 0").await?;
+
+        df.write_csv(&path, cfg1, Some(cfg2)).await?;
+        assert!(std::path::Path::new(&path).exists());
+
+        let files = std::fs::read_dir(&path).unwrap();
+        assert!(files.count() == 1);
+
+        // Case 3. write to a directory with partition columns
+        // Expect: No file is created
+        let tmp_dir = tempfile::TempDir::new().unwrap();
+        let path = format!("{}", tmp_dir.path().to_string_lossy());
+
+        let df = ctx.sql("SELECT 1 as col1, 2 as col2 limit 0").await?;
+
+        let cfg1 = crate::dataframe::DataFrameWriteOptions::new()
+            .with_single_file_output(true)
+            .with_partition_by(vec!["col1".to_string()]);
+        let cfg2 = CsvOptions::default().with_has_header(true);
+
+        df.write_csv(&path, cfg1, Some(cfg2)).await?;
+
+        assert!(std::path::Path::new(&path).exists());
+        let files = std::fs::read_dir(&path).unwrap();
+        assert!(files.count() == 0);
+
+        Ok(())
+    }
+
     /// Read a single empty csv file with header
     ///
     /// empty.csv:
diff --git a/datafusion/core/src/datasource/file_format/json.rs 
b/datafusion/core/src/datasource/file_format/json.rs
index 34d3d64f07..d818187bb3 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -31,6 +31,7 @@ mod tests {
     use arrow_schema::Schema;
     use bytes::Bytes;
     use datafusion_catalog::Session;
+    use datafusion_common::config::JsonOptions;
     use datafusion_common::test_util::batches_to_string;
     use datafusion_datasource::decoder::{
         BatchDeserializer, DecoderDeserializer, DeserializerOutput,
@@ -257,6 +258,61 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_json_write_empty_file() -> Result<()> {
+        // Case 1. write to a single file
+        // Expect: an empty file created
+        let tmp_dir = tempfile::TempDir::new().unwrap();
+        let path = format!("{}/empty.json", tmp_dir.path().to_string_lossy());
+
+        let ctx = SessionContext::new();
+
+        let df = ctx.sql("SELECT 1 limit 0").await?;
+
+        let cfg1 =
+            
crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true);
+        let cfg2 = JsonOptions::default();
+
+        df.write_json(&path, cfg1, Some(cfg2)).await?;
+        assert!(std::path::Path::new(&path).exists());
+
+        // Case 2. write to a directory without partition columns
+        // Expect: under the directory, an empty file is created
+        let tmp_dir = tempfile::TempDir::new().unwrap();
+        let path = format!("{}", tmp_dir.path().to_string_lossy());
+
+        let cfg1 =
+            
crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true);
+        let cfg2 = JsonOptions::default();
+
+        let df = ctx.sql("SELECT 1 limit 0").await?;
+
+        df.write_json(&path, cfg1, Some(cfg2)).await?;
+        assert!(std::path::Path::new(&path).exists());
+
+        let files = std::fs::read_dir(&path).unwrap();
+        assert!(files.count() == 1);
+
+        // Case 3. write to a directory with partition columns
+        // Expect: No file is created
+        let tmp_dir = tempfile::TempDir::new().unwrap();
+        let path = format!("{}", tmp_dir.path().to_string_lossy());
+
+        let df = ctx.sql("SELECT 1 as col1, 2 as col2 limit 0").await?;
+
+        let cfg1 = crate::dataframe::DataFrameWriteOptions::new()
+            .with_single_file_output(true)
+            .with_partition_by(vec!["col1".to_string()]);
+        let cfg2 = JsonOptions::default();
+
+        df.write_json(&path, cfg1, Some(cfg2)).await?;
+
+        assert!(std::path::Path::new(&path).exists());
+        let files = std::fs::read_dir(&path).unwrap();
+        assert!(files.count() == 0);
+        Ok(())
+    }
+
     #[test]
     fn test_json_deserializer_finish() -> Result<()> {
         let schema = Arc::new(Schema::new(vec![
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs 
b/datafusion/core/src/datasource/file_format/parquet.rs
index 6a5c19829c..dcd20f8a26 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -1246,6 +1246,61 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_parquet_write_empty_file() -> Result<()> {
+        // Case 1. write to a single file
+        // Expect: an empty file created
+        let tmp_dir = tempfile::TempDir::new().unwrap();
+        let path = format!("{}/empty.parquet", 
tmp_dir.path().to_string_lossy());
+
+        let ctx = SessionContext::new();
+
+        let df = ctx.sql("SELECT 1 limit 0").await?;
+
+        let cfg1 =
+            
crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true);
+        let cfg2 = TableParquetOptions::default();
+
+        df.write_parquet(&path, cfg1, Some(cfg2)).await?;
+        assert!(std::path::Path::new(&path).exists());
+
+        // Case 2. write to a directory without partition columns
+        // Expect: under the directory, an empty file is created
+        let tmp_dir = tempfile::TempDir::new().unwrap();
+        let path = format!("{}", tmp_dir.path().to_string_lossy());
+
+        let cfg1 =
+            
crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true);
+        let cfg2 = TableParquetOptions::default();
+
+        let df = ctx.sql("SELECT 1 limit 0").await?;
+
+        df.write_parquet(&path, cfg1, Some(cfg2)).await?;
+        assert!(std::path::Path::new(&path).exists());
+
+        let files = std::fs::read_dir(&path).unwrap();
+        assert!(files.count() == 1);
+
+        // Case 3. write to a directory with partition columns
+        // Expect: No file is created
+        let tmp_dir = tempfile::TempDir::new().unwrap();
+        let path = format!("{}", tmp_dir.path().to_string_lossy());
+
+        let df = ctx.sql("SELECT 1 as col1, 2 as col2 limit 0").await?;
+
+        let cfg1 = crate::dataframe::DataFrameWriteOptions::new()
+            .with_single_file_output(true)
+            .with_partition_by(vec!["col1".to_string()]);
+        let cfg2 = TableParquetOptions::default();
+
+        df.write_parquet(&path, cfg1, Some(cfg2)).await?;
+
+        assert!(std::path::Path::new(&path).exists());
+        let files = std::fs::read_dir(&path).unwrap();
+        assert!(files.count() == 0);
+        Ok(())
+    }
+
     #[tokio::test]
     async fn parquet_sink_write_insert_schema_into_metadata() -> Result<()> {
         // expected kv metadata without schema
diff --git a/datafusion/datasource/src/file_sink_config.rs 
b/datafusion/datasource/src/file_sink_config.rs
index 2968bd1ee0..8a86b11a47 100644
--- a/datafusion/datasource/src/file_sink_config.rs
+++ b/datafusion/datasource/src/file_sink_config.rs
@@ -22,12 +22,14 @@ use crate::sink::DataSink;
 use crate::write::demux::{start_demuxer_task, DemuxedStreamReceiver};
 use crate::ListingTableUrl;
 
+use arrow::array::RecordBatch;
 use arrow::datatypes::{DataType, SchemaRef};
 use datafusion_common::Result;
 use datafusion_common_runtime::SpawnedTask;
 use datafusion_execution::object_store::ObjectStoreUrl;
 use datafusion_execution::{SendableRecordBatchStream, TaskContext};
 use datafusion_expr::dml::InsertOp;
+use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
 
 use async_trait::async_trait;
 use object_store::ObjectStore;
@@ -77,13 +79,34 @@ pub trait FileSink: DataSink {
             .runtime_env()
             .object_store(&config.object_store_url)?;
         let (demux_task, file_stream_rx) = start_demuxer_task(config, data, 
context);
-        self.spawn_writer_tasks_and_join(
-            context,
-            demux_task,
-            file_stream_rx,
-            object_store,
-        )
-        .await
+        let mut num_rows = self
+            .spawn_writer_tasks_and_join(
+                context,
+                demux_task,
+                file_stream_rx,
+                Arc::clone(&object_store),
+            )
+            .await?;
+        if num_rows == 0 {
+            // If no rows were written, then no files are output either.
+            // In this case, send an empty record batch through to ensure the 
output file is generated
+            let schema = Arc::clone(&config.output_schema);
+            let empty_batch = RecordBatch::new_empty(Arc::clone(&schema));
+            let data = Box::pin(RecordBatchStreamAdapter::new(
+                schema,
+                futures::stream::iter(vec![Ok(empty_batch)]),
+            ));
+            let (demux_task, file_stream_rx) = start_demuxer_task(config, 
data, context);
+            num_rows = self
+                .spawn_writer_tasks_and_join(
+                    context,
+                    demux_task,
+                    file_stream_rx,
+                    Arc::clone(&object_store),
+                )
+                .await?;
+        }
+        Ok(num_rows)
     }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to