This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 0c5d72b86c fix: avoid compressed json files repartitioning (#10470)
0c5d72b86c is described below

commit 0c5d72b86c419ffcf320cb58e7e30fe5402e342e
Author: Eduard Karacharov <[email protected]>
AuthorDate: Tue May 14 14:30:17 2024 +0300

    fix: avoid compressed json files repartitioning (#10470)
---
 .../core/src/datasource/physical_plan/json.rs      | 79 ++++++++++++++--------
 1 file changed, 52 insertions(+), 27 deletions(-)

diff --git a/datafusion/core/src/datasource/physical_plan/json.rs 
b/datafusion/core/src/datasource/physical_plan/json.rs
index 2ec1b91d08..0180caa850 100644
--- a/datafusion/core/src/datasource/physical_plan/json.rs
+++ b/datafusion/core/src/datasource/physical_plan/json.rs
@@ -154,7 +154,7 @@ impl ExecutionPlan for NdJsonExec {
         target_partitions: usize,
         config: &datafusion_common::config::ConfigOptions,
     ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
-        if self.file_compression_type == FileCompressionType::GZIP {
+        if self.file_compression_type.is_compressed() {
             return Ok(None);
         }
         let repartition_file_min_size = 
config.optimizer.repartition_file_min_size;
@@ -383,7 +383,6 @@ mod tests {
     use std::path::Path;
 
     use super::*;
-    use crate::assert_batches_eq;
     use crate::dataframe::DataFrameWriteOptions;
     use crate::datasource::file_format::file_compression_type::FileTypeExt;
     use crate::datasource::file_format::{json::JsonFormat, FileFormat};
@@ -393,18 +392,15 @@ mod tests {
         CsvReadOptions, NdJsonReadOptions, SessionConfig, SessionContext,
     };
     use crate::test::partitioned_file_groups;
+    use crate::{assert_batches_eq, assert_batches_sorted_eq};
 
     use arrow::array::Array;
     use arrow::datatypes::{Field, SchemaBuilder};
     use datafusion_common::cast::{as_int32_array, as_int64_array, 
as_string_array};
     use datafusion_common::FileType;
-    use flate2::write::GzEncoder;
-    use flate2::Compression;
     use object_store::chunked::ChunkedStore;
     use object_store::local::LocalFileSystem;
     use rstest::*;
-    use std::fs::File;
-    use std::io;
     use tempfile::TempDir;
     use url::Url;
 
@@ -892,36 +888,65 @@ mod tests {
 
         Ok(())
     }
-    fn compress_file(path: &str, output_path: &str) -> io::Result<()> {
-        let input_file = File::open(path)?;
-        let mut reader = BufReader::new(input_file);
 
-        let output_file = File::create(output_path)?;
-        let writer = std::io::BufWriter::new(output_file);
-
-        let mut encoder = GzEncoder::new(writer, Compression::default());
-        io::copy(&mut reader, &mut encoder)?;
-
-        encoder.finish()?;
-        Ok(())
-    }
+    #[rstest(
+        file_compression_type,
+        case::uncompressed(FileCompressionType::UNCOMPRESSED),
+        case::gzip(FileCompressionType::GZIP),
+        case::bzip2(FileCompressionType::BZIP2),
+        case::xz(FileCompressionType::XZ),
+        case::zstd(FileCompressionType::ZSTD)
+    )]
+    #[cfg(feature = "compression")]
     #[tokio::test]
-    async fn test_disable_parallel_for_json_gz() -> Result<()> {
+    async fn test_json_with_repartitioing(
+        file_compression_type: FileCompressionType,
+    ) -> Result<()> {
         let config = SessionConfig::new()
             .with_repartition_file_scans(true)
             .with_repartition_file_min_size(0)
             .with_target_partitions(4);
         let ctx = SessionContext::new_with_config(config);
-        let path = format!("{TEST_DATA_BASE}/1.json");
-        let compressed_path = format!("{}.gz", &path);
-        compress_file(&path, &compressed_path)?;
+
+        let tmp_dir = TempDir::new()?;
+        let (store_url, file_groups, _) =
+            prepare_store(&ctx.state(), file_compression_type, 
tmp_dir.path()).await;
+
+        // It's important to have less than `target_partitions` amount of file 
groups, to
+        // trigger repartitioning.
+        assert_eq!(
+            file_groups.len(),
+            1,
+            "Expected prepared store with single file group"
+        );
+
+        let path = file_groups
+            .first()
+            .unwrap()
+            .first()
+            .unwrap()
+            .object_meta
+            .location
+            .as_ref();
+
+        let url: &Url = store_url.as_ref();
+        let path_buf = Path::new(url.path()).join(path);
+        let path = path_buf.to_str().unwrap();
+        let ext = FileType::JSON
+            .get_ext_with_compression(file_compression_type.to_owned())
+            .unwrap();
+
         let read_option = NdJsonReadOptions::default()
-            .file_compression_type(FileCompressionType::GZIP)
-            .file_extension("gz");
-        let df = ctx.read_json(compressed_path.clone(), read_option).await?;
+            .file_compression_type(file_compression_type)
+            .file_extension(ext.as_str());
+
+        let df = ctx.read_json(path, read_option).await?;
         let res = df.collect().await;
-        fs::remove_file(&compressed_path)?;
-        assert_batches_eq!(
+
+        // Output sort order is nondeterministic due to multiple
+        // target partitions. To handle it, assert compares sorted
+        // result.
+        assert_batches_sorted_eq!(
             &[
                 "+-----+------------------+---------------+------+",
                 "| a   | b                | c             | d    |",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to