This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 7d16cd039b Use zstd::bulk API in IPC and Parquet with context reuse
for compression and decompression (#9400)
7d16cd039b is described below
commit 7d16cd039b74fc24a766eb852856278de7f4567b
Author: Daniël Heres <[email protected]>
AuthorDate: Fri Feb 13 00:02:58 2026 +0100
Use zstd::bulk API in IPC and Parquet with context reuse for compression
and decompression (#9400)
# Which issue does this PR close?
- Closes #9401
# Rationale for this change
Switch parquet and IPC zstd codec from the streaming API
(zstd::Encoder/Decoder) to the bulk API
(zstd::bulk::Compressor/Decompressor) with reusable contexts. This
avoids the overhead of reinitializing zstd contexts on every
compress/decompress call, yielding ~8-11% speedup on benchmarks.
Parquet: Store Compressor and Decompressor in ZSTDCodec, reused across
calls. IPC: Add DecompressionContext (mirroring existing
CompressionContext) with a reusable bulk Decompressor, threaded through
RecordBatchDecoder.
```
Benchmark: cargo bench -p parquet --features experimental --bench
compression -- "Zstd"
┌────────────────────────────────┬──────────┬───────────┬────────┐
│ Benchmark │ Main │ Optimized │ Change │
├────────────────────────────────┼──────────┼───────────┼────────┤
│ compress ZSTD - alphanumeric │ 866 µs │ 789 µs │ -9.6% │
├────────────────────────────────┼──────────┼───────────┼────────┤
│ decompress ZSTD - alphanumeric │ 1.125 ms │ 1.007 ms │ -8.8% │
├────────────────────────────────┼──────────┼───────────┼────────┤
│ compress ZSTD - words │ 2.869 ms │ 2.590 ms │ -9.7% │
├────────────────────────────────┼──────────┼───────────┼────────┤
│ decompress ZSTD - words │ 1.001 ms │ 848 µs │ -10.6% │
└────────────────────────────────┴──────────┴───────────┴────────┘
IPC Reader Decompression (10 batches)
Benchmark: cargo bench -p arrow-ipc --features zstd --bench ipc_reader --
"zstd"
┌─────────────────────────────────────────┬──────────┬───────────┬────────┐
│ Benchmark │ Main │ Optimized │ Change
│
├─────────────────────────────────────────┼──────────┼───────────┼────────┤
│ StreamReader/read_10/zstd │ 2.756 ms │ 2.540 ms │ -7.8%
│
├─────────────────────────────────────────┼──────────┼───────────┼────────┤
│ StreamReader/no_validation/read_10/zstd │ 2.601 ms │ 2.352 ms │ -9.6%
│
└─────────────────────────────────────────┴──────────┴───────────┴────────┘
```
# What changes are included in this PR?
# Are these changes tested?
# Are there any user-facing changes?
---------
Co-authored-by: Claude Opus 4.6 <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
---
arrow-ipc/src/compression.rs | 82 ++++++++++++++++++++++++++++++++++++++------
arrow-ipc/src/reader.rs | 17 +++++++--
parquet/src/compression.rs | 42 ++++++++++++++---------
3 files changed, 111 insertions(+), 30 deletions(-)
diff --git a/arrow-ipc/src/compression.rs b/arrow-ipc/src/compression.rs
index 9bbc6e752c..1b7c84d9f0 100644
--- a/arrow-ipc/src/compression.rs
+++ b/arrow-ipc/src/compression.rs
@@ -57,6 +57,43 @@ impl std::fmt::Debug for CompressionContext {
}
}
+/// Additional context that may be needed for decompression.
+///
+/// In the case of zstd, this will contain the zstd decompression context,
which can be reused
+/// between subsequent decompression calls to avoid the performance overhead
of initialising a new
+/// context for every decompression.
+pub struct DecompressionContext {
+ #[cfg(feature = "zstd")]
+ decompressor: zstd::bulk::Decompressor<'static>,
+}
+
+impl DecompressionContext {
+ pub(crate) fn new() -> Self {
+ Default::default()
+ }
+}
+
+#[allow(clippy::derivable_impls)]
+impl Default for DecompressionContext {
+ fn default() -> Self {
+ DecompressionContext {
+ #[cfg(feature = "zstd")]
+ decompressor: zstd::bulk::Decompressor::new().expect("can create
zstd decompressor"),
+ }
+ }
+}
+
+impl std::fmt::Debug for DecompressionContext {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ let mut ds = f.debug_struct("DecompressionContext");
+
+ #[cfg(feature = "zstd")]
+ ds.field("decompressor", &"zstd::bulk::Decompressor");
+
+ ds.finish()
+ }
+}
+
/// Represents compressing a ipc stream using a particular compression
algorithm
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompressionCodec {
@@ -126,7 +163,11 @@ impl CompressionCodec {
/// [8 bytes]: uncompressed length
/// [remaining bytes]: compressed data stream
/// ```
- pub(crate) fn decompress_to_buffer(&self, input: &Buffer) ->
Result<Buffer, ArrowError> {
+ pub(crate) fn decompress_to_buffer(
+ &self,
+ input: &Buffer,
+ context: &mut DecompressionContext,
+ ) -> Result<Buffer, ArrowError> {
// read the first 8 bytes to determine if the data is
// compressed
let decompressed_length = read_uncompressed_size(input);
@@ -139,7 +180,7 @@ impl CompressionCodec {
} else if let Ok(decompressed_length) =
usize::try_from(decompressed_length) {
// decompress data using the codec
let input_data = &input[(LENGTH_OF_PREFIX_DATA as usize)..];
- let v = self.decompress(input_data, decompressed_length as _)?;
+ let v = self.decompress(input_data, decompressed_length as _,
context)?;
Buffer::from_vec(v)
} else {
return Err(ArrowError::IpcError(format!(
@@ -165,10 +206,15 @@ impl CompressionCodec {
/// Decompress the data in input buffer and write to output buffer
/// using the specified compression
- fn decompress(&self, input: &[u8], decompressed_size: usize) ->
Result<Vec<u8>, ArrowError> {
+ fn decompress(
+ &self,
+ input: &[u8],
+ decompressed_size: usize,
+ context: &mut DecompressionContext,
+ ) -> Result<Vec<u8>, ArrowError> {
let ret = match self {
CompressionCodec::Lz4Frame => decompress_lz4(input,
decompressed_size)?,
- CompressionCodec::Zstd => decompress_zstd(input,
decompressed_size)?,
+ CompressionCodec::Zstd => decompress_zstd(input,
decompressed_size, context)?,
};
if ret.len() != decompressed_size {
return Err(ArrowError::IpcError(format!(
@@ -239,16 +285,22 @@ fn compress_zstd(
}
#[cfg(feature = "zstd")]
-fn decompress_zstd(input: &[u8], decompressed_size: usize) -> Result<Vec<u8>,
ArrowError> {
- use std::io::Read;
- let mut output = Vec::with_capacity(decompressed_size);
- zstd::Decoder::with_buffer(input)?.read_to_end(&mut output)?;
+fn decompress_zstd(
+ input: &[u8],
+ decompressed_size: usize,
+ context: &mut DecompressionContext,
+) -> Result<Vec<u8>, ArrowError> {
+ let output = context.decompressor.decompress(input, decompressed_size)?;
Ok(output)
}
#[cfg(not(feature = "zstd"))]
#[allow(clippy::ptr_arg)]
-fn decompress_zstd(_input: &[u8], _decompressed_size: usize) ->
Result<Vec<u8>, ArrowError> {
+fn decompress_zstd(
+ _input: &[u8],
+ _decompressed_size: usize,
+ _context: &mut DecompressionContext,
+) -> Result<Vec<u8>, ArrowError> {
Err(ArrowError::InvalidArgumentError(
"zstd IPC decompression requires the zstd feature".to_string(),
))
@@ -278,7 +330,11 @@ mod tests {
.compress(input_bytes, &mut output_bytes, &mut Default::default())
.unwrap();
let result = codec
- .decompress(output_bytes.as_slice(), input_bytes.len())
+ .decompress(
+ output_bytes.as_slice(),
+ input_bytes.len(),
+ &mut Default::default(),
+ )
.unwrap();
assert_eq!(input_bytes, result.as_slice());
}
@@ -293,7 +349,11 @@ mod tests {
.compress(input_bytes, &mut output_bytes, &mut Default::default())
.unwrap();
let result = codec
- .decompress(output_bytes.as_slice(), input_bytes.len())
+ .decompress(
+ output_bytes.as_slice(),
+ input_bytes.len(),
+ &mut Default::default(),
+ )
.unwrap();
assert_eq!(input_bytes, result.as_slice());
}
diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs
index d5cda3e4e5..6a5dc707d7 100644
--- a/arrow-ipc/src/reader.rs
+++ b/arrow-ipc/src/reader.rs
@@ -42,7 +42,7 @@ use arrow_buffer::{
use arrow_data::{ArrayData, ArrayDataBuilder, UnsafeFlag};
use arrow_schema::*;
-use crate::compression::CompressionCodec;
+use crate::compression::{CompressionCodec, DecompressionContext};
use crate::r#gen::Message::{self};
use crate::{Block, CONTINUATION_MARKER, FieldNode, MetadataVersion};
use DataType::*;
@@ -60,13 +60,16 @@ fn read_buffer(
buf: &crate::Buffer,
a_data: &Buffer,
compression_codec: Option<CompressionCodec>,
+ decompression_context: &mut DecompressionContext,
) -> Result<Buffer, ArrowError> {
let start_offset = buf.offset() as usize;
let buf_data = a_data.slice_with_length(start_offset, buf.length() as
usize);
// corner case: empty buffer
match (buf_data.is_empty(), compression_codec) {
(true, _) | (_, None) => Ok(buf_data),
- (false, Some(decompressor)) =>
decompressor.decompress_to_buffer(&buf_data),
+ (false, Some(decompressor)) => {
+ decompressor.decompress_to_buffer(&buf_data, decompression_context)
+ }
}
}
impl RecordBatchDecoder<'_> {
@@ -444,6 +447,8 @@ pub struct RecordBatchDecoder<'a> {
dictionaries_by_id: &'a HashMap<i64, ArrayRef>,
/// Optional compression codec
compression: Option<CompressionCodec>,
+ /// Decompression context for reusing zstd decompressor state
+ decompression_context: DecompressionContext,
/// The format version
version: MetadataVersion,
/// The raw data buffer
@@ -490,6 +495,7 @@ impl<'a> RecordBatchDecoder<'a> {
schema,
dictionaries_by_id,
compression,
+ decompression_context: DecompressionContext::new(),
version: *metadata,
data: buf,
nodes: field_nodes.iter(),
@@ -606,7 +612,12 @@ impl<'a> RecordBatchDecoder<'a> {
let buffer = self.buffers.next().ok_or_else(|| {
ArrowError::IpcError("Buffer count mismatched with
metadata".to_string())
})?;
- read_buffer(buffer, self.data, self.compression)
+ read_buffer(
+ buffer,
+ self.data,
+ self.compression,
+ &mut self.decompression_context,
+ )
}
fn skip_buffer(&mut self) {
diff --git a/parquet/src/compression.rs b/parquet/src/compression.rs
index 530838955c..fe2fb59c5b 100644
--- a/parquet/src/compression.rs
+++ b/parquet/src/compression.rs
@@ -503,20 +503,27 @@ pub use lz4_codec::*;
#[cfg(any(feature = "zstd", test))]
mod zstd_codec {
- use std::io::{self, Write};
-
use crate::compression::{Codec, ZstdLevel};
use crate::errors::Result;
/// Codec for Zstandard compression algorithm.
+ ///
+ /// Uses `zstd::bulk` API with reusable compressor/decompressor contexts
+ /// to avoid the overhead of reinitializing contexts for each operation.
pub struct ZSTDCodec {
- level: ZstdLevel,
+ compressor: zstd::bulk::Compressor<'static>,
+ decompressor: zstd::bulk::Decompressor<'static>,
}
impl ZSTDCodec {
/// Creates new Zstandard compression codec.
pub(crate) fn new(level: ZstdLevel) -> Self {
- Self { level }
+ Self {
+ compressor:
zstd::bulk::Compressor::new(level.compression_level())
+ .expect("valid zstd compression level"),
+ decompressor: zstd::bulk::Decompressor::new()
+ .expect("can create zstd decompressor"),
+ }
}
}
@@ -525,22 +532,25 @@ mod zstd_codec {
&mut self,
input_buf: &[u8],
output_buf: &mut Vec<u8>,
- _uncompress_size: Option<usize>,
+ uncompress_size: Option<usize>,
) -> Result<usize> {
- let mut decoder = zstd::Decoder::new(input_buf)?;
- match io::copy(&mut decoder, output_buf) {
- Ok(n) => Ok(n as usize),
- Err(e) => Err(e.into()),
- }
+ let capacity = uncompress_size.unwrap_or_else(|| {
+ // Get the decompressed size from the zstd frame header
+ zstd::zstd_safe::get_frame_content_size(input_buf)
+ .ok()
+ .flatten()
+ .unwrap_or(input_buf.len() as u64 * 4) as usize
+ });
+ let decompressed = self.decompressor.decompress(input_buf,
capacity)?;
+ let len = decompressed.len();
+ output_buf.extend_from_slice(&decompressed);
+ Ok(len)
}
fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) ->
Result<()> {
- let mut encoder = zstd::Encoder::new(output_buf, self.level.0)?;
- encoder.write_all(input_buf)?;
- match encoder.finish() {
- Ok(_) => Ok(()),
- Err(e) => Err(e.into()),
- }
+ let compressed = self.compressor.compress(input_buf)?;
+ output_buf.extend_from_slice(&compressed);
+ Ok(())
}
}
}