This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 33be09ad12 Revert "fix: create file for empty stream" (#16682)
33be09ad12 is described below
commit 33be09ad12a3f45992615daf0e79cfe18ba657bc
Author: Bruno <[email protected]>
AuthorDate: Tue Jul 8 22:15:55 2025 +0200
Revert "fix: create file for empty stream" (#16682)
* Revert "fix: create file for empty stream (#16342)"
This reverts commit 4084894ebe1889b6ce80f3a207453154de274b03.
It adds a test showcasing the functionality that the commit above broke:
writing a parquet file from an empty RecordBatch.
* Add verification that the schema is correct
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/core/src/datasource/file_format/csv.rs | 56 -------------------
datafusion/core/src/datasource/file_format/json.rs | 56 -------------------
.../core/src/datasource/file_format/parquet.rs | 65 +++++++---------------
datafusion/datasource/src/file_sink_config.rs | 37 +++---------
4 files changed, 26 insertions(+), 188 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/csv.rs
b/datafusion/core/src/datasource/file_format/csv.rs
index 9b118b4340..777895ca61 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -797,62 +797,6 @@ mod tests {
Ok(())
}
- #[tokio::test]
- async fn test_csv_write_empty_file() -> Result<()> {
- // Case 1. write to a single file
- // Expect: an empty file created
- let tmp_dir = tempfile::TempDir::new().unwrap();
- let path = format!("{}/empty.csv", tmp_dir.path().to_string_lossy());
-
- let ctx = SessionContext::new();
-
- let df = ctx.sql("SELECT 1 limit 0").await?;
-
- let cfg1 =
-
crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true);
- let cfg2 = CsvOptions::default().with_has_header(true);
-
- df.write_csv(&path, cfg1, Some(cfg2)).await?;
- assert!(std::path::Path::new(&path).exists());
-
- // Case 2. write to a directory without partition columns
- // Expect: under the directory, an empty file is created
- let tmp_dir = tempfile::TempDir::new().unwrap();
- let path = format!("{}", tmp_dir.path().to_string_lossy());
-
- let cfg1 =
-
crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true);
- let cfg2 = CsvOptions::default().with_has_header(true);
-
- let df = ctx.sql("SELECT 1 limit 0").await?;
-
- df.write_csv(&path, cfg1, Some(cfg2)).await?;
- assert!(std::path::Path::new(&path).exists());
-
- let files = std::fs::read_dir(&path).unwrap();
- assert!(files.count() == 1);
-
- // Case 3. write to a directory with partition columns
- // Expect: No file is created
- let tmp_dir = tempfile::TempDir::new().unwrap();
- let path = format!("{}", tmp_dir.path().to_string_lossy());
-
- let df = ctx.sql("SELECT 1 as col1, 2 as col2 limit 0").await?;
-
- let cfg1 = crate::dataframe::DataFrameWriteOptions::new()
- .with_single_file_output(true)
- .with_partition_by(vec!["col1".to_string()]);
- let cfg2 = CsvOptions::default().with_has_header(true);
-
- df.write_csv(&path, cfg1, Some(cfg2)).await?;
-
- assert!(std::path::Path::new(&path).exists());
- let files = std::fs::read_dir(&path).unwrap();
- assert!(files.count() == 0);
-
- Ok(())
- }
-
/// Read a single empty csv file with header
///
/// empty.csv:
diff --git a/datafusion/core/src/datasource/file_format/json.rs
b/datafusion/core/src/datasource/file_format/json.rs
index d818187bb3..34d3d64f07 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -31,7 +31,6 @@ mod tests {
use arrow_schema::Schema;
use bytes::Bytes;
use datafusion_catalog::Session;
- use datafusion_common::config::JsonOptions;
use datafusion_common::test_util::batches_to_string;
use datafusion_datasource::decoder::{
BatchDeserializer, DecoderDeserializer, DeserializerOutput,
@@ -258,61 +257,6 @@ mod tests {
Ok(())
}
- #[tokio::test]
- async fn test_json_write_empty_file() -> Result<()> {
- // Case 1. write to a single file
- // Expect: an empty file created
- let tmp_dir = tempfile::TempDir::new().unwrap();
- let path = format!("{}/empty.json", tmp_dir.path().to_string_lossy());
-
- let ctx = SessionContext::new();
-
- let df = ctx.sql("SELECT 1 limit 0").await?;
-
- let cfg1 =
-
crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true);
- let cfg2 = JsonOptions::default();
-
- df.write_json(&path, cfg1, Some(cfg2)).await?;
- assert!(std::path::Path::new(&path).exists());
-
- // Case 2. write to a directory without partition columns
- // Expect: under the directory, an empty file is created
- let tmp_dir = tempfile::TempDir::new().unwrap();
- let path = format!("{}", tmp_dir.path().to_string_lossy());
-
- let cfg1 =
-
crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true);
- let cfg2 = JsonOptions::default();
-
- let df = ctx.sql("SELECT 1 limit 0").await?;
-
- df.write_json(&path, cfg1, Some(cfg2)).await?;
- assert!(std::path::Path::new(&path).exists());
-
- let files = std::fs::read_dir(&path).unwrap();
- assert!(files.count() == 1);
-
- // Case 3. write to a directory with partition columns
- // Expect: No file is created
- let tmp_dir = tempfile::TempDir::new().unwrap();
- let path = format!("{}", tmp_dir.path().to_string_lossy());
-
- let df = ctx.sql("SELECT 1 as col1, 2 as col2 limit 0").await?;
-
- let cfg1 = crate::dataframe::DataFrameWriteOptions::new()
- .with_single_file_output(true)
- .with_partition_by(vec!["col1".to_string()]);
- let cfg2 = JsonOptions::default();
-
- df.write_json(&path, cfg1, Some(cfg2)).await?;
-
- assert!(std::path::Path::new(&path).exists());
- let files = std::fs::read_dir(&path).unwrap();
- assert!(files.count() == 0);
- Ok(())
- }
-
#[test]
fn test_json_deserializer_finish() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs
b/datafusion/core/src/datasource/file_format/parquet.rs
index 8a2db3431f..385e0f7b9d 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -1263,57 +1263,30 @@ mod tests {
}
#[tokio::test]
- async fn test_parquet_write_empty_file() -> Result<()> {
- // Case 1. write to a single file
- // Expect: an empty file created
- let tmp_dir = tempfile::TempDir::new().unwrap();
- let path = format!("{}/empty.parquet",
tmp_dir.path().to_string_lossy());
-
- let ctx = SessionContext::new();
-
- let df = ctx.sql("SELECT 1 limit 0").await?;
-
- let cfg1 =
-
crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true);
- let cfg2 = TableParquetOptions::default();
-
- df.write_parquet(&path, cfg1, Some(cfg2)).await?;
- assert!(std::path::Path::new(&path).exists());
+ async fn test_write_empty_recordbatch_creates_file() -> Result<()> {
+ let empty_record_batch = RecordBatch::try_new(
+ Arc::new(Schema::new(vec![Field::new("id", DataType::Int32,
false)])),
+ vec![Arc::new(Int32Array::from(Vec::<i32>::new()))],
+ )
+ .expect("Failed to create empty RecordBatch");
- // Case 2. write to a directory without partition columns
- // Expect: under the directory, an empty file is created
let tmp_dir = tempfile::TempDir::new().unwrap();
- let path = format!("{}", tmp_dir.path().to_string_lossy());
-
- let cfg1 =
-
crate::dataframe::DataFrameWriteOptions::new().with_single_file_output(true);
- let cfg2 = TableParquetOptions::default();
+ let path = format!("{}/empty2.parquet",
tmp_dir.path().to_string_lossy());
- let df = ctx.sql("SELECT 1 limit 0").await?;
-
- df.write_parquet(&path, cfg1, Some(cfg2)).await?;
+ let ctx = SessionContext::new();
+ let df = ctx.read_batch(empty_record_batch.clone())?;
+ df.write_parquet(&path,
crate::dataframe::DataFrameWriteOptions::new(), None)
+ .await?;
assert!(std::path::Path::new(&path).exists());
- let files = std::fs::read_dir(&path).unwrap();
- assert!(files.count() == 1);
-
- // Case 3. write to a directory with partition columns
- // Expect: No file is created
- let tmp_dir = tempfile::TempDir::new().unwrap();
- let path = format!("{}", tmp_dir.path().to_string_lossy());
-
- let df = ctx.sql("SELECT 1 as col1, 2 as col2 limit 0").await?;
-
- let cfg1 = crate::dataframe::DataFrameWriteOptions::new()
- .with_single_file_output(true)
- .with_partition_by(vec!["col1".to_string()]);
- let cfg2 = TableParquetOptions::default();
-
- df.write_parquet(&path, cfg1, Some(cfg2)).await?;
-
- assert!(std::path::Path::new(&path).exists());
- let files = std::fs::read_dir(&path).unwrap();
- assert!(files.count() == 0);
+ let stream = ctx
+ .read_parquet(&path, ParquetReadOptions::new())
+ .await?
+ .execute_stream()
+ .await?;
+ assert_eq!(stream.schema(), empty_record_batch.schema());
+ let results = stream.collect::<Vec<_>>().await;
+ assert_eq!(results.len(), 0);
Ok(())
}
diff --git a/datafusion/datasource/src/file_sink_config.rs
b/datafusion/datasource/src/file_sink_config.rs
index 8a86b11a47..2968bd1ee0 100644
--- a/datafusion/datasource/src/file_sink_config.rs
+++ b/datafusion/datasource/src/file_sink_config.rs
@@ -22,14 +22,12 @@ use crate::sink::DataSink;
use crate::write::demux::{start_demuxer_task, DemuxedStreamReceiver};
use crate::ListingTableUrl;
-use arrow::array::RecordBatch;
use arrow::datatypes::{DataType, SchemaRef};
use datafusion_common::Result;
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::dml::InsertOp;
-use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
use async_trait::async_trait;
use object_store::ObjectStore;
@@ -79,34 +77,13 @@ pub trait FileSink: DataSink {
.runtime_env()
.object_store(&config.object_store_url)?;
let (demux_task, file_stream_rx) = start_demuxer_task(config, data,
context);
- let mut num_rows = self
- .spawn_writer_tasks_and_join(
- context,
- demux_task,
- file_stream_rx,
- Arc::clone(&object_store),
- )
- .await?;
- if num_rows == 0 {
- // If no rows were written, then no files are output either.
- // In this case, send an empty record batch through to ensure the
output file is generated
- let schema = Arc::clone(&config.output_schema);
- let empty_batch = RecordBatch::new_empty(Arc::clone(&schema));
- let data = Box::pin(RecordBatchStreamAdapter::new(
- schema,
- futures::stream::iter(vec![Ok(empty_batch)]),
- ));
- let (demux_task, file_stream_rx) = start_demuxer_task(config,
data, context);
- num_rows = self
- .spawn_writer_tasks_and_join(
- context,
- demux_task,
- file_stream_rx,
- Arc::clone(&object_store),
- )
- .await?;
- }
- Ok(num_rows)
+ self.spawn_writer_tasks_and_join(
+ context,
+ demux_task,
+ file_stream_rx,
+ object_store,
+ )
+ .await
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]