This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 33be09ad12 Revert "fix: create file for empty stream" (#16682)
33be09ad12 is described below

commit 33be09ad12a3f45992615daf0e79cfe18ba657bc
Author: Bruno <[email protected]>
AuthorDate: Tue Jul 8 22:15:55 2025 +0200

    Revert "fix: create file for empty stream" (#16682)
    
    * Revert "fix: create file for empty stream (#16342)"
    
    This reverts commit 4084894ebe1889b6ce80f3a207453154de274b03.
    
    It adds a test showcasing the functionality that the commit above broke:
    writing a parquet file from an empty RecordBatch.
    
    * Add verification that the schema is correct
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/core/src/datasource/file_format/csv.rs  | 56 -------------------
 datafusion/core/src/datasource/file_format/json.rs | 56 -------------------
 .../core/src/datasource/file_format/parquet.rs     | 65 +++++++---------------
 datafusion/datasource/src/file_sink_config.rs      | 37 +++---------
 4 files changed, 26 insertions(+), 188 deletions(-)

diff --git a/datafusion/core/src/datasource/file_format/csv.rs 
b/datafusion/core/src/datasource/file_format/csv.rs
index 9b118b4340..777895ca61 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -797,62 +797,6 @@ mod tests {
         Ok(())
     }
 
-    #[tokio::test]
-    async fn test_csv_write_empty_file() -> Result<()> {
-        // Case 1. write to a single file
-        // Expect: an empty file created
-        let tmp_dir = tempfile::TempDir::new().unwrap();
-        let path = format!("{}/empty.csv", tmp_dir.path().to_string_lossy());
-
-        let ctx = SessionContext::new();
-
-        let df = ctx.sql("SELECT 1 limit 0").await?;
-
-        let cfg1 =
-            
crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true);
-        let cfg2 = CsvOptions::default().with_has_header(true);
-
-        df.write_csv(&path, cfg1, Some(cfg2)).await?;
-        assert!(std::path::Path::new(&path).exists());
-
-        // Case 2. write to a directory without partition columns
-        // Expect: under the directory, an empty file is created
-        let tmp_dir = tempfile::TempDir::new().unwrap();
-        let path = format!("{}", tmp_dir.path().to_string_lossy());
-
-        let cfg1 =
-            
crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true);
-        let cfg2 = CsvOptions::default().with_has_header(true);
-
-        let df = ctx.sql("SELECT 1 limit 0").await?;
-
-        df.write_csv(&path, cfg1, Some(cfg2)).await?;
-        assert!(std::path::Path::new(&path).exists());
-
-        let files = std::fs::read_dir(&path).unwrap();
-        assert!(files.count() == 1);
-
-        // Case 3. write to a directory with partition columns
-        // Expect: No file is created
-        let tmp_dir = tempfile::TempDir::new().unwrap();
-        let path = format!("{}", tmp_dir.path().to_string_lossy());
-
-        let df = ctx.sql("SELECT 1 as col1, 2 as col2 limit 0").await?;
-
-        let cfg1 = crate::dataframe::DataFrameWriteOptions::new()
-            .with_single_file_output(true)
-            .with_partition_by(vec!["col1".to_string()]);
-        let cfg2 = CsvOptions::default().with_has_header(true);
-
-        df.write_csv(&path, cfg1, Some(cfg2)).await?;
-
-        assert!(std::path::Path::new(&path).exists());
-        let files = std::fs::read_dir(&path).unwrap();
-        assert!(files.count() == 0);
-
-        Ok(())
-    }
-
     /// Read a single empty csv file with header
     ///
     /// empty.csv:
diff --git a/datafusion/core/src/datasource/file_format/json.rs 
b/datafusion/core/src/datasource/file_format/json.rs
index d818187bb3..34d3d64f07 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -31,7 +31,6 @@ mod tests {
     use arrow_schema::Schema;
     use bytes::Bytes;
     use datafusion_catalog::Session;
-    use datafusion_common::config::JsonOptions;
     use datafusion_common::test_util::batches_to_string;
     use datafusion_datasource::decoder::{
         BatchDeserializer, DecoderDeserializer, DeserializerOutput,
@@ -258,61 +257,6 @@ mod tests {
         Ok(())
     }
 
-    #[tokio::test]
-    async fn test_json_write_empty_file() -> Result<()> {
-        // Case 1. write to a single file
-        // Expect: an empty file created
-        let tmp_dir = tempfile::TempDir::new().unwrap();
-        let path = format!("{}/empty.json", tmp_dir.path().to_string_lossy());
-
-        let ctx = SessionContext::new();
-
-        let df = ctx.sql("SELECT 1 limit 0").await?;
-
-        let cfg1 =
-            
crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true);
-        let cfg2 = JsonOptions::default();
-
-        df.write_json(&path, cfg1, Some(cfg2)).await?;
-        assert!(std::path::Path::new(&path).exists());
-
-        // Case 2. write to a directory without partition columns
-        // Expect: under the directory, an empty file is created
-        let tmp_dir = tempfile::TempDir::new().unwrap();
-        let path = format!("{}", tmp_dir.path().to_string_lossy());
-
-        let cfg1 =
-            
crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true);
-        let cfg2 = JsonOptions::default();
-
-        let df = ctx.sql("SELECT 1 limit 0").await?;
-
-        df.write_json(&path, cfg1, Some(cfg2)).await?;
-        assert!(std::path::Path::new(&path).exists());
-
-        let files = std::fs::read_dir(&path).unwrap();
-        assert!(files.count() == 1);
-
-        // Case 3. write to a directory with partition columns
-        // Expect: No file is created
-        let tmp_dir = tempfile::TempDir::new().unwrap();
-        let path = format!("{}", tmp_dir.path().to_string_lossy());
-
-        let df = ctx.sql("SELECT 1 as col1, 2 as col2 limit 0").await?;
-
-        let cfg1 = crate::dataframe::DataFrameWriteOptions::new()
-            .with_single_file_output(true)
-            .with_partition_by(vec!["col1".to_string()]);
-        let cfg2 = JsonOptions::default();
-
-        df.write_json(&path, cfg1, Some(cfg2)).await?;
-
-        assert!(std::path::Path::new(&path).exists());
-        let files = std::fs::read_dir(&path).unwrap();
-        assert!(files.count() == 0);
-        Ok(())
-    }
-
     #[test]
     fn test_json_deserializer_finish() -> Result<()> {
         let schema = Arc::new(Schema::new(vec![
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs 
b/datafusion/core/src/datasource/file_format/parquet.rs
index 8a2db3431f..385e0f7b9d 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -1263,57 +1263,30 @@ mod tests {
     }
 
     #[tokio::test]
-    async fn test_parquet_write_empty_file() -> Result<()> {
-        // Case 1. write to a single file
-        // Expect: an empty file created
-        let tmp_dir = tempfile::TempDir::new().unwrap();
-        let path = format!("{}/empty.parquet", 
tmp_dir.path().to_string_lossy());
-
-        let ctx = SessionContext::new();
-
-        let df = ctx.sql("SELECT 1 limit 0").await?;
-
-        let cfg1 =
-            
crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true);
-        let cfg2 = TableParquetOptions::default();
-
-        df.write_parquet(&path, cfg1, Some(cfg2)).await?;
-        assert!(std::path::Path::new(&path).exists());
+    async fn test_write_empty_recordbatch_creates_file() -> Result<()> {
+        let empty_record_batch = RecordBatch::try_new(
+            Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, 
false)])),
+            vec![Arc::new(Int32Array::from(Vec::<i32>::new()))],
+        )
+        .expect("Failed to create empty RecordBatch");
 
-        // Case 2. write to a directory without partition columns
-        // Expect: under the directory, an empty file is created
         let tmp_dir = tempfile::TempDir::new().unwrap();
-        let path = format!("{}", tmp_dir.path().to_string_lossy());
-
-        let cfg1 =
-            
crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true);
-        let cfg2 = TableParquetOptions::default();
+        let path = format!("{}/empty2.parquet", 
tmp_dir.path().to_string_lossy());
 
-        let df = ctx.sql("SELECT 1 limit 0").await?;
-
-        df.write_parquet(&path, cfg1, Some(cfg2)).await?;
+        let ctx = SessionContext::new();
+        let df = ctx.read_batch(empty_record_batch.clone())?;
+        df.write_parquet(&path, 
crate::dataframe::DataFrameWriteOptions::new(), None)
+            .await?;
         assert!(std::path::Path::new(&path).exists());
 
-        let files = std::fs::read_dir(&path).unwrap();
-        assert!(files.count() == 1);
-
-        // Case 3. write to a directory with partition columns
-        // Expect: No file is created
-        let tmp_dir = tempfile::TempDir::new().unwrap();
-        let path = format!("{}", tmp_dir.path().to_string_lossy());
-
-        let df = ctx.sql("SELECT 1 as col1, 2 as col2 limit 0").await?;
-
-        let cfg1 = crate::dataframe::DataFrameWriteOptions::new()
-            .with_single_file_output(true)
-            .with_partition_by(vec!["col1".to_string()]);
-        let cfg2 = TableParquetOptions::default();
-
-        df.write_parquet(&path, cfg1, Some(cfg2)).await?;
-
-        assert!(std::path::Path::new(&path).exists());
-        let files = std::fs::read_dir(&path).unwrap();
-        assert!(files.count() == 0);
+        let stream = ctx
+            .read_parquet(&path, ParquetReadOptions::new())
+            .await?
+            .execute_stream()
+            .await?;
+        assert_eq!(stream.schema(), empty_record_batch.schema());
+        let results = stream.collect::<Vec<_>>().await;
+        assert_eq!(results.len(), 0);
         Ok(())
     }
 
diff --git a/datafusion/datasource/src/file_sink_config.rs 
b/datafusion/datasource/src/file_sink_config.rs
index 8a86b11a47..2968bd1ee0 100644
--- a/datafusion/datasource/src/file_sink_config.rs
+++ b/datafusion/datasource/src/file_sink_config.rs
@@ -22,14 +22,12 @@ use crate::sink::DataSink;
 use crate::write::demux::{start_demuxer_task, DemuxedStreamReceiver};
 use crate::ListingTableUrl;
 
-use arrow::array::RecordBatch;
 use arrow::datatypes::{DataType, SchemaRef};
 use datafusion_common::Result;
 use datafusion_common_runtime::SpawnedTask;
 use datafusion_execution::object_store::ObjectStoreUrl;
 use datafusion_execution::{SendableRecordBatchStream, TaskContext};
 use datafusion_expr::dml::InsertOp;
-use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
 
 use async_trait::async_trait;
 use object_store::ObjectStore;
@@ -79,34 +77,13 @@ pub trait FileSink: DataSink {
             .runtime_env()
             .object_store(&config.object_store_url)?;
         let (demux_task, file_stream_rx) = start_demuxer_task(config, data, 
context);
-        let mut num_rows = self
-            .spawn_writer_tasks_and_join(
-                context,
-                demux_task,
-                file_stream_rx,
-                Arc::clone(&object_store),
-            )
-            .await?;
-        if num_rows == 0 {
-            // If no rows were written, then no files are output either.
-            // In this case, send an empty record batch through to ensure the 
output file is generated
-            let schema = Arc::clone(&config.output_schema);
-            let empty_batch = RecordBatch::new_empty(Arc::clone(&schema));
-            let data = Box::pin(RecordBatchStreamAdapter::new(
-                schema,
-                futures::stream::iter(vec![Ok(empty_batch)]),
-            ));
-            let (demux_task, file_stream_rx) = start_demuxer_task(config, 
data, context);
-            num_rows = self
-                .spawn_writer_tasks_and_join(
-                    context,
-                    demux_task,
-                    file_stream_rx,
-                    Arc::clone(&object_store),
-                )
-                .await?;
-        }
-        Ok(num_rows)
+        self.spawn_writer_tasks_and_join(
+            context,
+            demux_task,
+            file_stream_rx,
+            object_store,
+        )
+        .await
     }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to