This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new ce3d446be5 Disable parallel reading for gziped ndjson file (#9799)
ce3d446be5 is described below

commit ce3d446be5f6a11664e100fc47940e6ecb5418d3
Author: Lordworms <[email protected]>
AuthorDate: Thu Mar 28 09:47:37 2024 -0500

    Disable parallel reading for gziped ndjson file (#9799)
    
    * for debug
    
    * disable paralle reading for gziped ndjson file
    
    * directly return None
    
    * delete .gz
    
    * fix clippy
---
 .../core/src/datasource/physical_plan/json.rs      | 52 +++++++++++++++++++++-
 1 file changed, 51 insertions(+), 1 deletion(-)

diff --git a/datafusion/core/src/datasource/physical_plan/json.rs 
b/datafusion/core/src/datasource/physical_plan/json.rs
index 194a4a91c3..c876b3d078 100644
--- a/datafusion/core/src/datasource/physical_plan/json.rs
+++ b/datafusion/core/src/datasource/physical_plan/json.rs
@@ -150,6 +150,9 @@ impl ExecutionPlan for NdJsonExec {
         target_partitions: usize,
         config: &datafusion_common::config::ConfigOptions,
     ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        if self.file_compression_type == FileCompressionType::GZIP {
+            return Ok(None);
+        }
         let repartition_file_min_size = 
config.optimizer.repartition_file_min_size;
         let preserve_order_within_groups = 
self.properties().output_ordering().is_some();
         let file_groups = &self.base_config.file_groups;
@@ -392,11 +395,14 @@ mod tests {
     use arrow::datatypes::{Field, SchemaBuilder};
     use datafusion_common::cast::{as_int32_array, as_int64_array, 
as_string_array};
     use datafusion_common::FileType;
-
+    use flate2::write::GzEncoder;
+    use flate2::Compression;
     use futures::StreamExt;
     use object_store::chunked::ChunkedStore;
     use object_store::local::LocalFileSystem;
     use rstest::*;
+    use std::fs::File;
+    use std::io;
     use tempfile::TempDir;
     use url::Url;
 
@@ -884,4 +890,48 @@ mod tests {
 
         Ok(())
     }
+    fn compress_file(path: &str, output_path: &str) -> io::Result<()> {
+        let input_file = File::open(path)?;
+        let mut reader = BufReader::new(input_file);
+
+        let output_file = File::create(output_path)?;
+        let writer = std::io::BufWriter::new(output_file);
+
+        let mut encoder = GzEncoder::new(writer, Compression::default());
+        io::copy(&mut reader, &mut encoder)?;
+
+        encoder.finish()?;
+        Ok(())
+    }
+    #[tokio::test]
+    async fn test_disable_parallel_for_json_gz() -> Result<()> {
+        let config = SessionConfig::new()
+            .with_repartition_file_scans(true)
+            .with_repartition_file_min_size(0)
+            .with_target_partitions(4);
+        let ctx = SessionContext::new_with_config(config);
+        let path = format!("{TEST_DATA_BASE}/1.json");
+        let compressed_path = format!("{}.gz", &path);
+        compress_file(&path, &compressed_path)?;
+        let read_option = NdJsonReadOptions::default()
+            .file_compression_type(FileCompressionType::GZIP)
+            .file_extension("gz");
+        let df = ctx.read_json(compressed_path.clone(), read_option).await?;
+        let res = df.collect().await;
+        fs::remove_file(&compressed_path)?;
+        assert_batches_eq!(
+            &[
+                "+-----+------------------+---------------+------+",
+                "| a   | b                | c             | d    |",
+                "+-----+------------------+---------------+------+",
+                "| 1   | [2.0, 1.3, -6.1] | [false, true] | 4    |",
+                "| -10 | [2.0, 1.3, -6.1] | [true, true]  | 4    |",
+                "| 2   | [2.0, , -6.1]    | [false, ]     | text |",
+                "|     |                  |               |      |",
+                "+-----+------------------+---------------+------+",
+            ],
+            &res?
+        );
+        Ok(())
+    }
 }

Reply via email to