Smotrov opened a new issue, #18947:
URL: https://github.com/apache/datafusion/issues/18947

   # DataFusion PR: Add Compression Level Configuration for JSON/CSV Output
   
   ## Issue Title
   **feat: Add compression level configuration for JSON and CSV writers**
   
   ---
   
   ## Issue Description
   
   ### Is your feature request related to a problem?
   
   When writing ZSTD-compressed JSON (NDJSON) files using 
`DataFrame::write_json()`, DataFusion uses the default ZSTD compression level 
(3). There's no way to configure a higher compression level (e.g., level 9) for 
better compression ratios.
   
   Currently, `JsonOptions` only exposes:
   ```rust
   pub struct JsonOptions {
       pub compression: CompressionTypeVariant,
       pub schema_infer_max_rec: Option<usize>,
   }
   ```
   
   The underlying `async-compression` crate supports configurable levels via 
`ZstdEncoder::with_quality(writer, Level::Precise(level))`, but DataFusion 
hardcodes the default:
   
   ```rust
   // In file_compression_type.rs
   CompressionTypeVariant::ZSTD => Box::new(ZstdEncoder::new(w))
   ```
   
   ### Describe the solution you'd like
   
   Add an optional `compression_level` field to `JsonOptions` and `CsvOptions`:
   
   ```rust
   pub struct JsonOptions {
       pub compression: CompressionTypeVariant,
       pub compression_level: Option<u32>,  // NEW
       pub schema_infer_max_rec: Option<usize>,
   }
   ```
   
   When specified, pass the level through to the encoder creation in 
`FileCompressionType`:
   
   ```rust
   CompressionTypeVariant::ZSTD => match compression_level {
       Some(level) => Box::new(ZstdEncoder::with_quality(w, 
Level::Precise(level as i32))),
       None => Box::new(ZstdEncoder::new(w)),  // Default level 3
   }
   ```
   
   ### Use Case
   
   - **Storage optimization**: Level 9 provides ~10-15% better compression than 
level 3
   - **Cost savings**: Smaller files = lower S3/GCS storage costs
   - **Bandwidth**: Faster transfers for compressed data over networks
   - **Consistency with Parquet**: Parquet already supports 
`parquet.compression_level` option
   
   ### ZSTD Level Reference
   
   | Level | Compression Ratio | Speed (MB/s) |
   |-------|------------------|--------------|
   | 1     | ~2.8:1           | ~500         |
   | 3     | ~3.2:1           | ~350         |
   | 9     | ~3.6:1           | ~90          |
   | 19    | ~4.0:1           | ~10          |
   
   
   
   ### Additional context
   
   This would align JSON/CSV writers with Parquet, which already supports 
compression level configuration. The change is backward-compatible since the 
field is optional and defaults to the current behavior.
   
   Related Parquet issues for reference:
   - #7691 - Update Default Parquet Write Compression  
   - #7692 - Parquet compression level configuration
   
   ---
   
   ## Implementation Plan
   
   ### Files to Modify
   
   #### 1. `datafusion/common/src/config.rs`
   
   Add `compression_level` to `JsonOptions` and `CsvOptions`:
   
   ```rust
   config_namespace! {
       pub struct JsonOptions {
           pub compression: CompressionTypeVariant, default = 
CompressionTypeVariant::UNCOMPRESSED
           /// Optional compression level (for ZSTD: -7 to 22, for GZIP: 0-9)
           pub compression_level: Option<u32>, default = None
           pub schema_infer_max_rec: Option<usize>, default = None
       }
   }
   ```
   
   #### 2. `datafusion/datasource/src/file_format/file_compression_type.rs`
   
   Modify `FileCompressionType::convert_async_writer()` to accept an optional 
level:
   
   ```rust
   pub fn convert_async_writer(
       &self,
       w: Box<dyn AsyncWrite + Send + Unpin>,
       compression_level: Option<u32>,
   ) -> Result<Box<dyn AsyncWrite + Send + Unpin>> {
       match self.get_variant() {
           CompressionTypeVariant::ZSTD => {
               match compression_level {
                   Some(level) => Ok(Box::new(ZstdEncoder::with_quality(
                       w,
                       async_compression::Level::Precise(level as i32),
                   ))),
                   None => Ok(Box::new(ZstdEncoder::new(w))),
               }
           }
           CompressionTypeVariant::GZIP => {
               match compression_level {
                   Some(level) => Ok(Box::new(GzipEncoder::with_quality(
                       w,
                       async_compression::Level::Precise(level as i32),
                   ))),
                   None => Ok(Box::new(GzipEncoder::new(w))),
               }
           }
           // ... other variants
       }
   }
   ```
   
   #### 3. `datafusion/datasource/src/file_format/json.rs`
   
   Pass compression level from `JsonOptions` to the sink:
   
   ```rust
   impl JsonSink {
       // Add compression_level field and pass it to convert_async_writer()
   }
   ```
   
   #### 4. `datafusion/datasource/src/file_format/csv.rs`
   
   Same changes for CSV writer.
   
   #### 5. SQL OPTIONS Support
   
   Add parsing for `compression_level` in COPY TO statements:
   
   ```sql
   COPY table TO 's3://bucket/path/'
   OPTIONS (
       format json,
       compression zstd,
       compression_level 9
   );
   ```
   
   ---
   
   ## Example Usage After PR
   
   ### Rust API
   
   ```rust
   use datafusion::common::config::JsonOptions;
   use datafusion::common::parsers::CompressionTypeVariant;
   
   let json_opts = JsonOptions {
       compression: CompressionTypeVariant::ZSTD,
       compression_level: Some(9),  // Use level 9 for better compression
       schema_infer_max_rec: None,
   };
   
   df.write_json("s3://bucket/output/", write_options, Some(json_opts)).await?;
   ```
   
   ### SQL
   
   ```sql
   -- Write with ZSTD level 9
   COPY my_table TO 's3://bucket/output/'
   OPTIONS (
       format 'json',
       compression 'zstd',
       compression_level 9
   );
   
   -- Write with GZIP level 6
   COPY my_table TO 's3://bucket/output/'
   OPTIONS (
       format 'csv',
       compression 'gzip',
       compression_level 6
   );
   ```
   
   ---
   
   ## References
   
   - [async-compression Level 
enum](https://docs.rs/async-compression/latest/async_compression/struct.Level.html)
   - [ZSTD compression levels](https://facebook.github.io/zstd/zstd_manual.html)
   - [DataFusion Parquet compression 
options](https://datafusion.apache.org/user-guide/configs.html)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to