This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 0c5d72b86c fix: avoid compressed json files repartitioning (#10470)
0c5d72b86c is described below
commit 0c5d72b86c419ffcf320cb58e7e30fe5402e342e
Author: Eduard Karacharov <[email protected]>
AuthorDate: Tue May 14 14:30:17 2024 +0300
fix: avoid compressed json files repartitioning (#10470)
---
.../core/src/datasource/physical_plan/json.rs | 79 ++++++++++++++--------
1 file changed, 52 insertions(+), 27 deletions(-)
diff --git a/datafusion/core/src/datasource/physical_plan/json.rs
b/datafusion/core/src/datasource/physical_plan/json.rs
index 2ec1b91d08..0180caa850 100644
--- a/datafusion/core/src/datasource/physical_plan/json.rs
+++ b/datafusion/core/src/datasource/physical_plan/json.rs
@@ -154,7 +154,7 @@ impl ExecutionPlan for NdJsonExec {
target_partitions: usize,
config: &datafusion_common::config::ConfigOptions,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
- if self.file_compression_type == FileCompressionType::GZIP {
+ if self.file_compression_type.is_compressed() {
return Ok(None);
}
let repartition_file_min_size =
config.optimizer.repartition_file_min_size;
@@ -383,7 +383,6 @@ mod tests {
use std::path::Path;
use super::*;
- use crate::assert_batches_eq;
use crate::dataframe::DataFrameWriteOptions;
use crate::datasource::file_format::file_compression_type::FileTypeExt;
use crate::datasource::file_format::{json::JsonFormat, FileFormat};
@@ -393,18 +392,15 @@ mod tests {
CsvReadOptions, NdJsonReadOptions, SessionConfig, SessionContext,
};
use crate::test::partitioned_file_groups;
+ use crate::{assert_batches_eq, assert_batches_sorted_eq};
use arrow::array::Array;
use arrow::datatypes::{Field, SchemaBuilder};
use datafusion_common::cast::{as_int32_array, as_int64_array,
as_string_array};
use datafusion_common::FileType;
- use flate2::write::GzEncoder;
- use flate2::Compression;
use object_store::chunked::ChunkedStore;
use object_store::local::LocalFileSystem;
use rstest::*;
- use std::fs::File;
- use std::io;
use tempfile::TempDir;
use url::Url;
@@ -892,36 +888,65 @@ mod tests {
Ok(())
}
- fn compress_file(path: &str, output_path: &str) -> io::Result<()> {
- let input_file = File::open(path)?;
- let mut reader = BufReader::new(input_file);
- let output_file = File::create(output_path)?;
- let writer = std::io::BufWriter::new(output_file);
-
- let mut encoder = GzEncoder::new(writer, Compression::default());
- io::copy(&mut reader, &mut encoder)?;
-
- encoder.finish()?;
- Ok(())
- }
+ #[rstest(
+ file_compression_type,
+ case::uncompressed(FileCompressionType::UNCOMPRESSED),
+ case::gzip(FileCompressionType::GZIP),
+ case::bzip2(FileCompressionType::BZIP2),
+ case::xz(FileCompressionType::XZ),
+ case::zstd(FileCompressionType::ZSTD)
+ )]
+ #[cfg(feature = "compression")]
#[tokio::test]
- async fn test_disable_parallel_for_json_gz() -> Result<()> {
+ async fn test_json_with_repartitioing(
+ file_compression_type: FileCompressionType,
+ ) -> Result<()> {
let config = SessionConfig::new()
.with_repartition_file_scans(true)
.with_repartition_file_min_size(0)
.with_target_partitions(4);
let ctx = SessionContext::new_with_config(config);
- let path = format!("{TEST_DATA_BASE}/1.json");
- let compressed_path = format!("{}.gz", &path);
- compress_file(&path, &compressed_path)?;
+
+ let tmp_dir = TempDir::new()?;
+ let (store_url, file_groups, _) =
+ prepare_store(&ctx.state(), file_compression_type,
tmp_dir.path()).await;
+
+ // It's important to have less than `target_partitions` amount of file
groups, to
+ // trigger repartitioning.
+ assert_eq!(
+ file_groups.len(),
+ 1,
+ "Expected prepared store with single file group"
+ );
+
+ let path = file_groups
+ .first()
+ .unwrap()
+ .first()
+ .unwrap()
+ .object_meta
+ .location
+ .as_ref();
+
+ let url: &Url = store_url.as_ref();
+ let path_buf = Path::new(url.path()).join(path);
+ let path = path_buf.to_str().unwrap();
+ let ext = FileType::JSON
+ .get_ext_with_compression(file_compression_type.to_owned())
+ .unwrap();
+
let read_option = NdJsonReadOptions::default()
- .file_compression_type(FileCompressionType::GZIP)
- .file_extension("gz");
- let df = ctx.read_json(compressed_path.clone(), read_option).await?;
+ .file_compression_type(file_compression_type)
+ .file_extension(ext.as_str());
+
+ let df = ctx.read_json(path, read_option).await?;
let res = df.collect().await;
- fs::remove_file(&compressed_path)?;
- assert_batches_eq!(
+
+ // Output sort order is nondeterministic due to multiple
+ // target partitions. To handle it, assert compares sorted
+ // result.
+ assert_batches_sorted_eq!(
&[
"+-----+------------------+---------------+------+",
"| a | b | c | d |",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]