This is an automated email from the ASF dual-hosted git repository.

goldmedal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 1fc7769814 fix: Ignore empty files in ListingTable when listing files 
with or without partition filters, as well as when inferring schema (#13750)
1fc7769814 is described below

commit 1fc776981496c0f0648ece8069992c65a7021c83
Author: Arttu <[email protected]>
AuthorDate: Wed Dec 18 17:05:23 2024 +0100

    fix: Ignore empty files in ListingTable when listing files with or without 
partition filters, as well as when inferring schema (#13750)
    
    * fix: Ignore empty files in ListingTable when listing files with or 
without partition filters, as well as when inferring schema
    
    * clippy
    
    * fix csv and json tests
    
    * add testing for parquet
    
    * cleanup
    
    * fix parquet tests
    
    * document describe_partition, add back repartition options to one of the 
csv empty files tests
---
 datafusion/core/src/datasource/file_format/csv.rs  |  38 ++-----
 datafusion/core/src/datasource/file_format/json.rs |   8 +-
 .../core/src/datasource/file_format/parquet.rs     |  57 ++++++++++-
 datafusion/core/src/datasource/listing/helpers.rs  | 112 ++++++++++++++++++++-
 datafusion/core/src/datasource/listing/table.rs    |   2 +
 5 files changed, 180 insertions(+), 37 deletions(-)

diff --git a/datafusion/core/src/datasource/file_format/csv.rs 
b/datafusion/core/src/datasource/file_format/csv.rs
index 9c96c68286..e9a93475d3 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -1259,18 +1259,13 @@ mod tests {
         Ok(())
     }
 
-    /// Read a single empty csv file in parallel
+    /// Read a single empty csv file
     ///
     /// empty_0_byte.csv:
     /// (file is empty)
-    #[rstest(n_partitions, case(1), case(2), case(3), case(4))]
     #[tokio::test]
-    async fn test_csv_parallel_empty_file(n_partitions: usize) -> Result<()> {
-        let config = SessionConfig::new()
-            .with_repartition_file_scans(true)
-            .with_repartition_file_min_size(0)
-            .with_target_partitions(n_partitions);
-        let ctx = SessionContext::new_with_config(config);
+    async fn test_csv_empty_file() -> Result<()> {
+        let ctx = SessionContext::new();
         ctx.register_csv(
             "empty",
             "tests/data/empty_0_byte.csv",
@@ -1278,32 +1273,24 @@ mod tests {
         )
         .await?;
 
-        // Require a predicate to enable repartition for the optimizer
         let query = "select * from empty where random() > 0.5;";
         let query_result = ctx.sql(query).await?.collect().await?;
-        let actual_partitions = count_query_csv_partitions(&ctx, query).await?;
 
         #[rustfmt::skip]
         let expected = ["++",
             "++"];
         assert_batches_eq!(expected, &query_result);
-        assert_eq!(1, actual_partitions); // Won't get partitioned if all 
files are empty
 
         Ok(())
     }
 
-    /// Read a single empty csv file with header in parallel
+    /// Read a single empty csv file with header
     ///
     /// empty.csv:
     /// c1,c2,c3
-    #[rstest(n_partitions, case(1), case(2), case(3))]
     #[tokio::test]
-    async fn test_csv_parallel_empty_with_header(n_partitions: usize) -> 
Result<()> {
-        let config = SessionConfig::new()
-            .with_repartition_file_scans(true)
-            .with_repartition_file_min_size(0)
-            .with_target_partitions(n_partitions);
-        let ctx = SessionContext::new_with_config(config);
+    async fn test_csv_empty_with_header() -> Result<()> {
+        let ctx = SessionContext::new();
         ctx.register_csv(
             "empty",
             "tests/data/empty.csv",
@@ -1311,21 +1298,18 @@ mod tests {
         )
         .await?;
 
-        // Require a predicate to enable repartition for the optimizer
         let query = "select * from empty where random() > 0.5;";
         let query_result = ctx.sql(query).await?.collect().await?;
-        let actual_partitions = count_query_csv_partitions(&ctx, query).await?;
 
         #[rustfmt::skip]
         let expected = ["++",
             "++"];
         assert_batches_eq!(expected, &query_result);
-        assert_eq!(n_partitions, actual_partitions);
 
         Ok(())
     }
 
-    /// Read multiple empty csv files in parallel
+    /// Read multiple empty csv files
     ///
     /// all_empty
     /// ├── empty0.csv
@@ -1334,13 +1318,13 @@ mod tests {
     ///
     /// empty0.csv/empty1.csv/empty2.csv:
     /// (file is empty)
-    #[rstest(n_partitions, case(1), case(2), case(3), case(4))]
     #[tokio::test]
-    async fn test_csv_parallel_multiple_empty_files(n_partitions: usize) -> 
Result<()> {
+    async fn test_csv_multiple_empty_files() -> Result<()> {
+        // Testing that partitioning doesn't break with empty files
         let config = SessionConfig::new()
             .with_repartition_file_scans(true)
             .with_repartition_file_min_size(0)
-            .with_target_partitions(n_partitions);
+            .with_target_partitions(4);
         let ctx = SessionContext::new_with_config(config);
         let file_format = 
Arc::new(CsvFormat::default().with_has_header(false));
         let listing_options = ListingOptions::new(file_format.clone())
@@ -1358,13 +1342,11 @@ mod tests {
         // Require a predicate to enable repartition for the optimizer
         let query = "select * from empty where random() > 0.5;";
         let query_result = ctx.sql(query).await?.collect().await?;
-        let actual_partitions = count_query_csv_partitions(&ctx, query).await?;
 
         #[rustfmt::skip]
         let expected = ["++",
             "++"];
         assert_batches_eq!(expected, &query_result);
-        assert_eq!(1, actual_partitions); // Won't get partitioned if all 
files are empty
 
         Ok(())
     }
diff --git a/datafusion/core/src/datasource/file_format/json.rs 
b/datafusion/core/src/datasource/file_format/json.rs
index e97853e9e7..4bdf336881 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -619,13 +619,11 @@ mod tests {
         Ok(())
     }
 
-    #[rstest(n_partitions, case(1), case(2), case(3), case(4))]
     #[tokio::test]
-    async fn it_can_read_empty_ndjson_in_parallel(n_partitions: usize) -> 
Result<()> {
+    async fn it_can_read_empty_ndjson() -> Result<()> {
         let config = SessionConfig::new()
             .with_repartition_file_scans(true)
-            .with_repartition_file_min_size(0)
-            .with_target_partitions(n_partitions);
+            .with_repartition_file_min_size(0);
 
         let ctx = SessionContext::new_with_config(config);
 
@@ -638,7 +636,6 @@ mod tests {
         let query = "SELECT * FROM json_parallel_empty WHERE random() > 0.5;";
 
         let result = ctx.sql(query).await?.collect().await?;
-        let actual_partitions = count_num_partitions(&ctx, query).await?;
 
         #[rustfmt::skip]
         let expected = [
@@ -647,7 +644,6 @@ mod tests {
         ];
 
         assert_batches_eq!(expected, &result);
-        assert_eq!(1, actual_partitions);
 
         Ok(())
     }
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs 
b/datafusion/core/src/datasource/file_format/parquet.rs
index 1d08de1722..383fd65752 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -1312,7 +1312,7 @@ mod tests {
 
     use crate::datasource::file_format::parquet::test_util::store_parquet;
     use crate::physical_plan::metrics::MetricValue;
-    use crate::prelude::{SessionConfig, SessionContext};
+    use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
     use arrow::array::{Array, ArrayRef, StringArray};
     use arrow_array::types::Int32Type;
     use arrow_array::{DictionaryArray, Int32Array, Int64Array};
@@ -1323,8 +1323,8 @@ mod tests {
         as_float64_array, as_int32_array, as_timestamp_nanosecond_array,
     };
     use datafusion_common::config::ParquetOptions;
-    use datafusion_common::ScalarValue;
     use datafusion_common::ScalarValue::Utf8;
+    use datafusion_common::{assert_batches_eq, ScalarValue};
     use datafusion_execution::object_store::ObjectStoreUrl;
     use datafusion_execution::runtime_env::RuntimeEnv;
     use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
@@ -2251,6 +2251,59 @@ mod tests {
         scan_format(state, &*format, &testdata, file_name, projection, 
limit).await
     }
 
+    /// Test that 0-byte files don't break while reading
+    #[tokio::test]
+    async fn test_read_empty_parquet() -> Result<()> {
+        let tmp_dir = tempfile::TempDir::new().unwrap();
+        let path = format!("{}/empty.parquet", 
tmp_dir.path().to_string_lossy());
+        File::create(&path).await?;
+
+        let ctx = SessionContext::new();
+
+        let df = ctx
+            .read_parquet(&path, ParquetReadOptions::default())
+            .await
+            .expect("read_parquet should succeed");
+
+        let result = df.collect().await?;
+        #[rustfmt::skip]
+        let expected = ["++",
+            "++"];
+        assert_batches_eq!(expected, &result);
+
+        Ok(())
+    }
+
+    /// Test that 0-byte files don't break while reading
+    #[tokio::test]
+    async fn test_read_partitioned_empty_parquet() -> Result<()> {
+        let tmp_dir = tempfile::TempDir::new().unwrap();
+        let partition_dir = tmp_dir.path().join("col1=a");
+        std::fs::create_dir(&partition_dir).unwrap();
+        File::create(partition_dir.join("empty.parquet"))
+            .await
+            .unwrap();
+
+        let ctx = SessionContext::new();
+
+        let df = ctx
+            .read_parquet(
+                tmp_dir.path().to_str().unwrap(),
+                ParquetReadOptions::new()
+                    .table_partition_cols(vec![("col1".to_string(), 
DataType::Utf8)]),
+            )
+            .await
+            .expect("read_parquet should succeed");
+
+        let result = df.collect().await?;
+        #[rustfmt::skip]
+        let expected = ["++",
+            "++"];
+        assert_batches_eq!(expected, &result);
+
+        Ok(())
+    }
+
     fn build_ctx(store_url: &url::Url) -> Arc<TaskContext> {
         let tmp_dir = tempfile::TempDir::new().unwrap();
         let local = Arc::new(
diff --git a/datafusion/core/src/datasource/listing/helpers.rs 
b/datafusion/core/src/datasource/listing/helpers.rs
index a601aec32f..228b9a4e9f 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -171,7 +171,13 @@ impl Partition {
         trace!("Listing partition {}", self.path);
         let prefix = Some(&self.path).filter(|p| !p.as_ref().is_empty());
         let result = store.list_with_delimiter(prefix).await?;
-        self.files = Some(result.objects);
+        self.files = Some(
+            result
+                .objects
+                .into_iter()
+                .filter(|object_meta| object_meta.size > 0)
+                .collect(),
+        );
         Ok((self, result.common_prefixes))
     }
 }
@@ -418,6 +424,7 @@ pub async fn pruned_partition_list<'a>(
             table_path
                 .list_all_files(ctx, store, file_extension)
                 .await?
+                .try_filter(|object_meta| 
futures::future::ready(object_meta.size > 0))
                 .map_ok(|object_meta| object_meta.into()),
         ));
     }
@@ -566,6 +573,7 @@ mod tests {
     async fn test_pruned_partition_list_empty() {
         let (store, state) = make_test_store_and_state(&[
             ("tablepath/mypartition=val1/notparquetfile", 100),
+            ("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0),
             ("tablepath/file.parquet", 100),
         ]);
         let filter = Expr::eq(col("mypartition"), lit("val1"));
@@ -590,6 +598,7 @@ mod tests {
         let (store, state) = make_test_store_and_state(&[
             ("tablepath/mypartition=val1/file.parquet", 100),
             ("tablepath/mypartition=val2/file.parquet", 100),
+            ("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0),
             ("tablepath/mypartition=val1/other=val3/file.parquet", 100),
         ]);
         let filter = Expr::eq(col("mypartition"), lit("val1"));
@@ -671,6 +680,107 @@ mod tests {
         );
     }
 
+    /// Describe a partition as a (path, depth, files) tuple for easier 
assertions
+    fn describe_partition(partition: &Partition) -> (&str, usize, Vec<&str>) {
+        (
+            partition.path.as_ref(),
+            partition.depth,
+            partition
+                .files
+                .as_ref()
+                .map(|f| f.iter().map(|f| 
f.location.filename().unwrap()).collect())
+                .unwrap_or_default(),
+        )
+    }
+
+    #[tokio::test]
+    async fn test_list_partition() {
+        let (store, _) = make_test_store_and_state(&[
+            ("tablepath/part1=p1v1/file.parquet", 100),
+            ("tablepath/part1=p1v2/part2=p2v1/file1.parquet", 100),
+            ("tablepath/part1=p1v2/part2=p2v1/file2.parquet", 100),
+            ("tablepath/part1=p1v3/part2=p2v1/file3.parquet", 100),
+            ("tablepath/part1=p1v2/part2=p2v2/file4.parquet", 100),
+            ("tablepath/part1=p1v2/part2=p2v2/empty.parquet", 0),
+        ]);
+
+        let partitions = list_partitions(
+            store.as_ref(),
+            &ListingTableUrl::parse("file:///tablepath/").unwrap(),
+            0,
+            None,
+        )
+        .await
+        .expect("listing partitions failed");
+
+        assert_eq!(
+            &partitions
+                .iter()
+                .map(describe_partition)
+                .collect::<Vec<_>>(),
+            &vec![
+                ("tablepath", 0, vec![]),
+                ("tablepath/part1=p1v1", 1, vec![]),
+                ("tablepath/part1=p1v2", 1, vec![]),
+                ("tablepath/part1=p1v3", 1, vec![]),
+            ]
+        );
+
+        let partitions = list_partitions(
+            store.as_ref(),
+            &ListingTableUrl::parse("file:///tablepath/").unwrap(),
+            1,
+            None,
+        )
+        .await
+        .expect("listing partitions failed");
+
+        assert_eq!(
+            &partitions
+                .iter()
+                .map(describe_partition)
+                .collect::<Vec<_>>(),
+            &vec![
+                ("tablepath", 0, vec![]),
+                ("tablepath/part1=p1v1", 1, vec!["file.parquet"]),
+                ("tablepath/part1=p1v2", 1, vec![]),
+                ("tablepath/part1=p1v2/part2=p2v1", 2, vec![]),
+                ("tablepath/part1=p1v2/part2=p2v2", 2, vec![]),
+                ("tablepath/part1=p1v3", 1, vec![]),
+                ("tablepath/part1=p1v3/part2=p2v1", 2, vec![]),
+            ]
+        );
+
+        let partitions = list_partitions(
+            store.as_ref(),
+            &ListingTableUrl::parse("file:///tablepath/").unwrap(),
+            2,
+            None,
+        )
+        .await
+        .expect("listing partitions failed");
+
+        assert_eq!(
+            &partitions
+                .iter()
+                .map(describe_partition)
+                .collect::<Vec<_>>(),
+            &vec![
+                ("tablepath", 0, vec![]),
+                ("tablepath/part1=p1v1", 1, vec!["file.parquet"]),
+                ("tablepath/part1=p1v2", 1, vec![]),
+                ("tablepath/part1=p1v3", 1, vec![]),
+                (
+                    "tablepath/part1=p1v2/part2=p2v1",
+                    2,
+                    vec!["file1.parquet", "file2.parquet"]
+                ),
+                ("tablepath/part1=p1v2/part2=p2v2", 2, vec!["file4.parquet"]),
+                ("tablepath/part1=p1v3/part2=p2v1", 2, vec!["file3.parquet"]),
+            ]
+        );
+    }
+
     #[test]
     fn test_parse_partitions_for_path() {
         assert_eq!(
diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index b12f37ed75..791b15704d 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -470,6 +470,8 @@ impl ListingOptions {
         let files: Vec<_> = table_path
             .list_all_files(state, store.as_ref(), &self.file_extension)
             .await?
+            // Empty files cannot affect schema but may throw when trying to 
read for it
+            .try_filter(|object_meta| future::ready(object_meta.size > 0))
             .try_collect()
             .await?;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to