Smotrov opened a new issue, #18947:
URL: https://github.com/apache/datafusion/issues/18947
# DataFusion PR: Add Compression Level Configuration for JSON/CSV Output
## Issue Title
**feat: Add compression level configuration for JSON and CSV writers**
---
## Issue Description
### Is your feature request related to a problem?
When writing ZSTD-compressed JSON (NDJSON) files using
`DataFrame::write_json()`, DataFusion uses the default ZSTD compression level
(3). There's no way to configure a higher compression level (e.g., level 9) for
better compression ratios.
Currently, `JsonOptions` only exposes:
```rust
pub struct JsonOptions {
pub compression: CompressionTypeVariant,
pub schema_infer_max_rec: Option<usize>,
}
```
The underlying `async-compression` crate supports configurable levels via
`ZstdEncoder::with_quality(writer, Level::Precise(level))`, but DataFusion
hardcodes the default:
```rust
// In file_compression_type.rs
CompressionTypeVariant::ZSTD => Box::new(ZstdEncoder::new(w))
```
### Describe the solution you'd like
Add an optional `compression_level` field to `JsonOptions` and `CsvOptions`:
```rust
pub struct JsonOptions {
pub compression: CompressionTypeVariant,
pub compression_level: Option<u32>, // NEW
pub schema_infer_max_rec: Option<usize>,
}
```
When specified, pass the level through to the encoder creation in
`FileCompressionType`:
```rust
CompressionTypeVariant::ZSTD => match compression_level {
Some(level) => Box::new(ZstdEncoder::with_quality(w,
Level::Precise(level as i32))),
None => Box::new(ZstdEncoder::new(w)), // Default level 3
}
```
### Use Case
- **Storage optimization**: Level 9 provides ~10-15% better compression than
level 3
- **Cost savings**: Smaller files = lower S3/GCS storage costs
- **Bandwidth**: Faster transfers for compressed data over networks
- **Consistency with Parquet**: Parquet already supports
`parquet.compression_level` option
### ZSTD Level Reference
| Level | Compression Ratio | Speed (MB/s) |
|-------|------------------|--------------|
| 1 | ~2.8:1 | ~500 |
| 3 | ~3.2:1 | ~350 |
| 9 | ~3.6:1 | ~90 |
| 19 | ~4.0:1 | ~10 |
### Additional context
This would align JSON/CSV writers with Parquet, which already supports
compression level configuration. The change is backward-compatible since the
field is optional and defaults to the current behavior.
Related Parquet issues for reference:
- #7691 - Update Default Parquet Write Compression
- #7692 - Parquet compression level configuration
---
## Implementation Plan
### Files to Modify
#### 1. `datafusion/common/src/config.rs`
Add `compression_level` to `JsonOptions` and `CsvOptions`:
```rust
config_namespace! {
pub struct JsonOptions {
pub compression: CompressionTypeVariant, default =
CompressionTypeVariant::UNCOMPRESSED
/// Optional compression level (for ZSTD: -7 to 22, for GZIP: 0-9)
pub compression_level: Option<u32>, default = None
pub schema_infer_max_rec: Option<usize>, default = None
}
}
```
#### 2. `datafusion/datasource/src/file_format/file_compression_type.rs`
Modify `FileCompressionType::convert_async_writer()` to accept an optional
level:
```rust
pub fn convert_async_writer(
&self,
w: Box<dyn AsyncWrite + Send + Unpin>,
compression_level: Option<u32>,
) -> Result<Box<dyn AsyncWrite + Send + Unpin>> {
match self.get_variant() {
CompressionTypeVariant::ZSTD => {
match compression_level {
Some(level) => Ok(Box::new(ZstdEncoder::with_quality(
w,
async_compression::Level::Precise(level as i32),
))),
None => Ok(Box::new(ZstdEncoder::new(w))),
}
}
CompressionTypeVariant::GZIP => {
match compression_level {
Some(level) => Ok(Box::new(GzipEncoder::with_quality(
w,
async_compression::Level::Precise(level as i32),
))),
None => Ok(Box::new(GzipEncoder::new(w))),
}
}
// ... other variants
}
}
```
#### 3. `datafusion/datasource/src/file_format/json.rs`
Pass compression level from `JsonOptions` to the sink:
```rust
impl JsonSink {
// Add compression_level field and pass it to convert_async_writer()
}
```
#### 4. `datafusion/datasource/src/file_format/csv.rs`
Same changes for CSV writer.
#### 5. SQL OPTIONS Support
Add parsing for `compression_level` in COPY TO statements:
```sql
COPY table TO 's3://bucket/path/'
OPTIONS (
format json,
compression zstd,
compression_level 9
);
```
---
## Example Usage After PR
### Rust API
```rust
use datafusion::common::config::JsonOptions;
use datafusion::common::parsers::CompressionTypeVariant;
let json_opts = JsonOptions {
compression: CompressionTypeVariant::ZSTD,
compression_level: Some(9), // Use level 9 for better compression
schema_infer_max_rec: None,
};
df.write_json("s3://bucket/output/", write_options, Some(json_opts)).await?;
```
### SQL
```sql
-- Write with ZSTD level 9
COPY my_table TO 's3://bucket/output/'
OPTIONS (
format 'json',
compression 'zstd',
compression_level 9
);
-- Write with GZIP level 6
COPY my_table TO 's3://bucket/output/'
OPTIONS (
format 'csv',
compression 'gzip',
compression_level 6
);
```
---
## References
- [async-compression Level
enum](https://docs.rs/async-compression/latest/async_compression/struct.Level.html)
- [ZSTD compression levels](https://facebook.github.io/zstd/zstd_manual.html)
- [DataFusion Parquet compression
options](https://datafusion.apache.org/user-guide/configs.html)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]