This is an automated email from the ASF dual-hosted git repository.
goldmedal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 1fc7769814 fix: Ignore empty files in ListingTable when listing files
with or without partition filters, as well as when inferring schema (#13750)
1fc7769814 is described below
commit 1fc776981496c0f0648ece8069992c65a7021c83
Author: Arttu <[email protected]>
AuthorDate: Wed Dec 18 17:05:23 2024 +0100
fix: Ignore empty files in ListingTable when listing files with or without
partition filters, as well as when inferring schema (#13750)
* fix: Ignore empty files in ListingTable when listing files with or
without partition filters, as well as when inferring schema
* clippy
* fix csv and json tests
* add testing for parquet
* cleanup
* fix parquet tests
* document describe_partition, add back repartition options to one of the
csv empty files tests
---
datafusion/core/src/datasource/file_format/csv.rs | 38 ++-----
datafusion/core/src/datasource/file_format/json.rs | 8 +-
.../core/src/datasource/file_format/parquet.rs | 57 ++++++++++-
datafusion/core/src/datasource/listing/helpers.rs | 112 ++++++++++++++++++++-
datafusion/core/src/datasource/listing/table.rs | 2 +
5 files changed, 180 insertions(+), 37 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/csv.rs
b/datafusion/core/src/datasource/file_format/csv.rs
index 9c96c68286..e9a93475d3 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -1259,18 +1259,13 @@ mod tests {
Ok(())
}
- /// Read a single empty csv file in parallel
+ /// Read a single empty csv file
///
/// empty_0_byte.csv:
/// (file is empty)
- #[rstest(n_partitions, case(1), case(2), case(3), case(4))]
#[tokio::test]
- async fn test_csv_parallel_empty_file(n_partitions: usize) -> Result<()> {
- let config = SessionConfig::new()
- .with_repartition_file_scans(true)
- .with_repartition_file_min_size(0)
- .with_target_partitions(n_partitions);
- let ctx = SessionContext::new_with_config(config);
+ async fn test_csv_empty_file() -> Result<()> {
+ let ctx = SessionContext::new();
ctx.register_csv(
"empty",
"tests/data/empty_0_byte.csv",
@@ -1278,32 +1273,24 @@ mod tests {
)
.await?;
- // Require a predicate to enable repartition for the optimizer
let query = "select * from empty where random() > 0.5;";
let query_result = ctx.sql(query).await?.collect().await?;
- let actual_partitions = count_query_csv_partitions(&ctx, query).await?;
#[rustfmt::skip]
let expected = ["++",
"++"];
assert_batches_eq!(expected, &query_result);
- assert_eq!(1, actual_partitions); // Won't get partitioned if all
files are empty
Ok(())
}
- /// Read a single empty csv file with header in parallel
+ /// Read a single empty csv file with header
///
/// empty.csv:
/// c1,c2,c3
- #[rstest(n_partitions, case(1), case(2), case(3))]
#[tokio::test]
- async fn test_csv_parallel_empty_with_header(n_partitions: usize) ->
Result<()> {
- let config = SessionConfig::new()
- .with_repartition_file_scans(true)
- .with_repartition_file_min_size(0)
- .with_target_partitions(n_partitions);
- let ctx = SessionContext::new_with_config(config);
+ async fn test_csv_empty_with_header() -> Result<()> {
+ let ctx = SessionContext::new();
ctx.register_csv(
"empty",
"tests/data/empty.csv",
@@ -1311,21 +1298,18 @@ mod tests {
)
.await?;
- // Require a predicate to enable repartition for the optimizer
let query = "select * from empty where random() > 0.5;";
let query_result = ctx.sql(query).await?.collect().await?;
- let actual_partitions = count_query_csv_partitions(&ctx, query).await?;
#[rustfmt::skip]
let expected = ["++",
"++"];
assert_batches_eq!(expected, &query_result);
- assert_eq!(n_partitions, actual_partitions);
Ok(())
}
- /// Read multiple empty csv files in parallel
+ /// Read multiple empty csv files
///
/// all_empty
/// ├── empty0.csv
@@ -1334,13 +1318,13 @@ mod tests {
///
/// empty0.csv/empty1.csv/empty2.csv:
/// (file is empty)
- #[rstest(n_partitions, case(1), case(2), case(3), case(4))]
#[tokio::test]
- async fn test_csv_parallel_multiple_empty_files(n_partitions: usize) ->
Result<()> {
+ async fn test_csv_multiple_empty_files() -> Result<()> {
+ // Testing that partitioning doesn't break with empty files
let config = SessionConfig::new()
.with_repartition_file_scans(true)
.with_repartition_file_min_size(0)
- .with_target_partitions(n_partitions);
+ .with_target_partitions(4);
let ctx = SessionContext::new_with_config(config);
let file_format =
Arc::new(CsvFormat::default().with_has_header(false));
let listing_options = ListingOptions::new(file_format.clone())
@@ -1358,13 +1342,11 @@ mod tests {
// Require a predicate to enable repartition for the optimizer
let query = "select * from empty where random() > 0.5;";
let query_result = ctx.sql(query).await?.collect().await?;
- let actual_partitions = count_query_csv_partitions(&ctx, query).await?;
#[rustfmt::skip]
let expected = ["++",
"++"];
assert_batches_eq!(expected, &query_result);
- assert_eq!(1, actual_partitions); // Won't get partitioned if all
files are empty
Ok(())
}
diff --git a/datafusion/core/src/datasource/file_format/json.rs
b/datafusion/core/src/datasource/file_format/json.rs
index e97853e9e7..4bdf336881 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -619,13 +619,11 @@ mod tests {
Ok(())
}
- #[rstest(n_partitions, case(1), case(2), case(3), case(4))]
#[tokio::test]
- async fn it_can_read_empty_ndjson_in_parallel(n_partitions: usize) ->
Result<()> {
+ async fn it_can_read_empty_ndjson() -> Result<()> {
let config = SessionConfig::new()
.with_repartition_file_scans(true)
- .with_repartition_file_min_size(0)
- .with_target_partitions(n_partitions);
+ .with_repartition_file_min_size(0);
let ctx = SessionContext::new_with_config(config);
@@ -638,7 +636,6 @@ mod tests {
let query = "SELECT * FROM json_parallel_empty WHERE random() > 0.5;";
let result = ctx.sql(query).await?.collect().await?;
- let actual_partitions = count_num_partitions(&ctx, query).await?;
#[rustfmt::skip]
let expected = [
@@ -647,7 +644,6 @@ mod tests {
];
assert_batches_eq!(expected, &result);
- assert_eq!(1, actual_partitions);
Ok(())
}
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs
b/datafusion/core/src/datasource/file_format/parquet.rs
index 1d08de1722..383fd65752 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -1312,7 +1312,7 @@ mod tests {
use crate::datasource::file_format::parquet::test_util::store_parquet;
use crate::physical_plan::metrics::MetricValue;
- use crate::prelude::{SessionConfig, SessionContext};
+ use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
use arrow::array::{Array, ArrayRef, StringArray};
use arrow_array::types::Int32Type;
use arrow_array::{DictionaryArray, Int32Array, Int64Array};
@@ -1323,8 +1323,8 @@ mod tests {
as_float64_array, as_int32_array, as_timestamp_nanosecond_array,
};
use datafusion_common::config::ParquetOptions;
- use datafusion_common::ScalarValue;
use datafusion_common::ScalarValue::Utf8;
+ use datafusion_common::{assert_batches_eq, ScalarValue};
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
@@ -2251,6 +2251,59 @@ mod tests {
scan_format(state, &*format, &testdata, file_name, projection,
limit).await
}
+ /// Test that 0-byte files don't break while reading
+ #[tokio::test]
+ async fn test_read_empty_parquet() -> Result<()> {
+ let tmp_dir = tempfile::TempDir::new().unwrap();
+ let path = format!("{}/empty.parquet",
tmp_dir.path().to_string_lossy());
+ File::create(&path).await?;
+
+ let ctx = SessionContext::new();
+
+ let df = ctx
+ .read_parquet(&path, ParquetReadOptions::default())
+ .await
+ .expect("read_parquet should succeed");
+
+ let result = df.collect().await?;
+ #[rustfmt::skip]
+ let expected = ["++",
+ "++"];
+ assert_batches_eq!(expected, &result);
+
+ Ok(())
+ }
+
+ /// Test that 0-byte files don't break while reading
+ #[tokio::test]
+ async fn test_read_partitioned_empty_parquet() -> Result<()> {
+ let tmp_dir = tempfile::TempDir::new().unwrap();
+ let partition_dir = tmp_dir.path().join("col1=a");
+ std::fs::create_dir(&partition_dir).unwrap();
+ File::create(partition_dir.join("empty.parquet"))
+ .await
+ .unwrap();
+
+ let ctx = SessionContext::new();
+
+ let df = ctx
+ .read_parquet(
+ tmp_dir.path().to_str().unwrap(),
+ ParquetReadOptions::new()
+ .table_partition_cols(vec![("col1".to_string(),
DataType::Utf8)]),
+ )
+ .await
+ .expect("read_parquet should succeed");
+
+ let result = df.collect().await?;
+ #[rustfmt::skip]
+ let expected = ["++",
+ "++"];
+ assert_batches_eq!(expected, &result);
+
+ Ok(())
+ }
+
fn build_ctx(store_url: &url::Url) -> Arc<TaskContext> {
let tmp_dir = tempfile::TempDir::new().unwrap();
let local = Arc::new(
diff --git a/datafusion/core/src/datasource/listing/helpers.rs
b/datafusion/core/src/datasource/listing/helpers.rs
index a601aec32f..228b9a4e9f 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -171,7 +171,13 @@ impl Partition {
trace!("Listing partition {}", self.path);
let prefix = Some(&self.path).filter(|p| !p.as_ref().is_empty());
let result = store.list_with_delimiter(prefix).await?;
- self.files = Some(result.objects);
+ self.files = Some(
+ result
+ .objects
+ .into_iter()
+ .filter(|object_meta| object_meta.size > 0)
+ .collect(),
+ );
Ok((self, result.common_prefixes))
}
}
@@ -418,6 +424,7 @@ pub async fn pruned_partition_list<'a>(
table_path
.list_all_files(ctx, store, file_extension)
.await?
+ .try_filter(|object_meta|
futures::future::ready(object_meta.size > 0))
.map_ok(|object_meta| object_meta.into()),
));
}
@@ -566,6 +573,7 @@ mod tests {
async fn test_pruned_partition_list_empty() {
let (store, state) = make_test_store_and_state(&[
("tablepath/mypartition=val1/notparquetfile", 100),
+ ("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0),
("tablepath/file.parquet", 100),
]);
let filter = Expr::eq(col("mypartition"), lit("val1"));
@@ -590,6 +598,7 @@ mod tests {
let (store, state) = make_test_store_and_state(&[
("tablepath/mypartition=val1/file.parquet", 100),
("tablepath/mypartition=val2/file.parquet", 100),
+ ("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0),
("tablepath/mypartition=val1/other=val3/file.parquet", 100),
]);
let filter = Expr::eq(col("mypartition"), lit("val1"));
@@ -671,6 +680,107 @@ mod tests {
);
}
+ /// Describe a partition as a (path, depth, files) tuple for easier
assertions
+ fn describe_partition(partition: &Partition) -> (&str, usize, Vec<&str>) {
+ (
+ partition.path.as_ref(),
+ partition.depth,
+ partition
+ .files
+ .as_ref()
+ .map(|f| f.iter().map(|f|
f.location.filename().unwrap()).collect())
+ .unwrap_or_default(),
+ )
+ }
+
+ #[tokio::test]
+ async fn test_list_partition() {
+ let (store, _) = make_test_store_and_state(&[
+ ("tablepath/part1=p1v1/file.parquet", 100),
+ ("tablepath/part1=p1v2/part2=p2v1/file1.parquet", 100),
+ ("tablepath/part1=p1v2/part2=p2v1/file2.parquet", 100),
+ ("tablepath/part1=p1v3/part2=p2v1/file3.parquet", 100),
+ ("tablepath/part1=p1v2/part2=p2v2/file4.parquet", 100),
+ ("tablepath/part1=p1v2/part2=p2v2/empty.parquet", 0),
+ ]);
+
+ let partitions = list_partitions(
+ store.as_ref(),
+ &ListingTableUrl::parse("file:///tablepath/").unwrap(),
+ 0,
+ None,
+ )
+ .await
+ .expect("listing partitions failed");
+
+ assert_eq!(
+ &partitions
+ .iter()
+ .map(describe_partition)
+ .collect::<Vec<_>>(),
+ &vec![
+ ("tablepath", 0, vec![]),
+ ("tablepath/part1=p1v1", 1, vec![]),
+ ("tablepath/part1=p1v2", 1, vec![]),
+ ("tablepath/part1=p1v3", 1, vec![]),
+ ]
+ );
+
+ let partitions = list_partitions(
+ store.as_ref(),
+ &ListingTableUrl::parse("file:///tablepath/").unwrap(),
+ 1,
+ None,
+ )
+ .await
+ .expect("listing partitions failed");
+
+ assert_eq!(
+ &partitions
+ .iter()
+ .map(describe_partition)
+ .collect::<Vec<_>>(),
+ &vec![
+ ("tablepath", 0, vec![]),
+ ("tablepath/part1=p1v1", 1, vec!["file.parquet"]),
+ ("tablepath/part1=p1v2", 1, vec![]),
+ ("tablepath/part1=p1v2/part2=p2v1", 2, vec![]),
+ ("tablepath/part1=p1v2/part2=p2v2", 2, vec![]),
+ ("tablepath/part1=p1v3", 1, vec![]),
+ ("tablepath/part1=p1v3/part2=p2v1", 2, vec![]),
+ ]
+ );
+
+ let partitions = list_partitions(
+ store.as_ref(),
+ &ListingTableUrl::parse("file:///tablepath/").unwrap(),
+ 2,
+ None,
+ )
+ .await
+ .expect("listing partitions failed");
+
+ assert_eq!(
+ &partitions
+ .iter()
+ .map(describe_partition)
+ .collect::<Vec<_>>(),
+ &vec![
+ ("tablepath", 0, vec![]),
+ ("tablepath/part1=p1v1", 1, vec!["file.parquet"]),
+ ("tablepath/part1=p1v2", 1, vec![]),
+ ("tablepath/part1=p1v3", 1, vec![]),
+ (
+ "tablepath/part1=p1v2/part2=p2v1",
+ 2,
+ vec!["file1.parquet", "file2.parquet"]
+ ),
+ ("tablepath/part1=p1v2/part2=p2v2", 2, vec!["file4.parquet"]),
+ ("tablepath/part1=p1v3/part2=p2v1", 2, vec!["file3.parquet"]),
+ ]
+ );
+ }
+
#[test]
fn test_parse_partitions_for_path() {
assert_eq!(
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index b12f37ed75..791b15704d 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -470,6 +470,8 @@ impl ListingOptions {
let files: Vec<_> = table_path
.list_all_files(state, store.as_ref(), &self.file_extension)
.await?
+ // Empty files cannot affect schema but may throw when trying to
read for it
+ .try_filter(|object_meta| future::ready(object_meta.size > 0))
.try_collect()
.await?;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]