This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 7d16cd039b Use zstd::bulk API in IPC and Parquet with context reuse 
for compression and decompression (#9400)
7d16cd039b is described below

commit 7d16cd039b74fc24a766eb852856278de7f4567b
Author: Daniël Heres <[email protected]>
AuthorDate: Fri Feb 13 00:02:58 2026 +0100

    Use zstd::bulk API in IPC and Parquet with context reuse for compression 
and decompression (#9400)
    
    # Which issue does this PR close?
    
    - Closes #9401
    
    # Rationale for this change
    Switch parquet and IPC zstd codec from the streaming API
    (zstd::Encoder/Decoder) to the bulk API
    (zstd::bulk::Compressor/Decompressor) with reusable contexts. This
    avoids the overhead of reinitializing zstd contexts on every
    compress/decompress call, yielding ~8-11% speedup on benchmarks.
    
    Parquet: Store Compressor and Decompressor in ZSTDCodec, reused across
    calls. IPC: Add DecompressionContext (mirroring existing
    CompressionContext) with a reusable bulk Decompressor, threaded through
    RecordBatchDecoder.
    
    
    ```
      Benchmark: cargo bench -p parquet --features experimental --bench 
compression -- "Zstd"
      ┌────────────────────────────────┬──────────┬───────────┬────────┐
      │           Benchmark            │   Main   │ Optimized │ Change │
      ├────────────────────────────────┼──────────┼───────────┼────────┤
      │ compress ZSTD - alphanumeric   │ 866 µs   │ 789 µs    │ -9.6%  │
      ├────────────────────────────────┼──────────┼───────────┼────────┤
      │ decompress ZSTD - alphanumeric │ 1.125 ms │ 1.007 ms  │ -8.8%  │
      ├────────────────────────────────┼──────────┼───────────┼────────┤
      │ compress ZSTD - words          │ 2.869 ms │ 2.590 ms  │ -9.7%  │
      ├────────────────────────────────┼──────────┼───────────┼────────┤
      │ decompress ZSTD - words        │ 1.001 ms │ 848 µs    │ -10.6% │
      └────────────────────────────────┴──────────┴───────────┴────────┘
      IPC Reader Decompression (10 batches)
    
      Benchmark: cargo bench -p arrow-ipc --features zstd --bench ipc_reader -- 
"zstd"
      
┌─────────────────────────────────────────┬──────────┬───────────┬────────┐
      │                Benchmark                │   Main   │ Optimized │ Change 
│
      
├─────────────────────────────────────────┼──────────┼───────────┼────────┤
      │ StreamReader/read_10/zstd               │ 2.756 ms │ 2.540 ms  │ -7.8%  
│
      
├─────────────────────────────────────────┼──────────┼───────────┼────────┤
      │ StreamReader/no_validation/read_10/zstd │ 2.601 ms │ 2.352 ms  │ -9.6%  
│
      
└─────────────────────────────────────────┴──────────┴───────────┴────────┘
    ```
    
    # What changes are included in this PR?
    
    # Are these changes tested?
    
    
    # Are there any user-facing changes?
    
    ---------
    
    Co-authored-by: Claude Opus 4.6 <[email protected]>
    Co-authored-by: Andrew Lamb <[email protected]>
---
 arrow-ipc/src/compression.rs | 82 ++++++++++++++++++++++++++++++++++++++------
 arrow-ipc/src/reader.rs      | 17 +++++++--
 parquet/src/compression.rs   | 42 ++++++++++++++---------
 3 files changed, 111 insertions(+), 30 deletions(-)

diff --git a/arrow-ipc/src/compression.rs b/arrow-ipc/src/compression.rs
index 9bbc6e752c..1b7c84d9f0 100644
--- a/arrow-ipc/src/compression.rs
+++ b/arrow-ipc/src/compression.rs
@@ -57,6 +57,43 @@ impl std::fmt::Debug for CompressionContext {
     }
 }
 
+/// Additional context that may be needed for decompression.
+///
+/// In the case of zstd, this will contain the zstd decompression context, 
which can be reused
+/// between subsequent decompression calls to avoid the performance overhead 
of initialising a new
+/// context for every decompression.
+pub struct DecompressionContext {
+    #[cfg(feature = "zstd")]
+    decompressor: zstd::bulk::Decompressor<'static>,
+}
+
+impl DecompressionContext {
+    pub(crate) fn new() -> Self {
+        Default::default()
+    }
+}
+
+#[allow(clippy::derivable_impls)]
+impl Default for DecompressionContext {
+    fn default() -> Self {
+        DecompressionContext {
+            #[cfg(feature = "zstd")]
+            decompressor: zstd::bulk::Decompressor::new().expect("can create 
zstd decompressor"),
+        }
+    }
+}
+
+impl std::fmt::Debug for DecompressionContext {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        let mut ds = f.debug_struct("DecompressionContext");
+
+        #[cfg(feature = "zstd")]
+        ds.field("decompressor", &"zstd::bulk::Decompressor");
+
+        ds.finish()
+    }
+}
+
 /// Represents compressing a ipc stream using a particular compression 
algorithm
 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
 pub enum CompressionCodec {
@@ -126,7 +163,11 @@ impl CompressionCodec {
     /// [8 bytes]:         uncompressed length
     /// [remaining bytes]: compressed data stream
     /// ```
-    pub(crate) fn decompress_to_buffer(&self, input: &Buffer) -> 
Result<Buffer, ArrowError> {
+    pub(crate) fn decompress_to_buffer(
+        &self,
+        input: &Buffer,
+        context: &mut DecompressionContext,
+    ) -> Result<Buffer, ArrowError> {
         // read the first 8 bytes to determine if the data is
         // compressed
         let decompressed_length = read_uncompressed_size(input);
@@ -139,7 +180,7 @@ impl CompressionCodec {
         } else if let Ok(decompressed_length) = 
usize::try_from(decompressed_length) {
             // decompress data using the codec
             let input_data = &input[(LENGTH_OF_PREFIX_DATA as usize)..];
-            let v = self.decompress(input_data, decompressed_length as _)?;
+            let v = self.decompress(input_data, decompressed_length as _, 
context)?;
             Buffer::from_vec(v)
         } else {
             return Err(ArrowError::IpcError(format!(
@@ -165,10 +206,15 @@ impl CompressionCodec {
 
     /// Decompress the data in input buffer and write to output buffer
     /// using the specified compression
-    fn decompress(&self, input: &[u8], decompressed_size: usize) -> 
Result<Vec<u8>, ArrowError> {
+    fn decompress(
+        &self,
+        input: &[u8],
+        decompressed_size: usize,
+        context: &mut DecompressionContext,
+    ) -> Result<Vec<u8>, ArrowError> {
         let ret = match self {
             CompressionCodec::Lz4Frame => decompress_lz4(input, 
decompressed_size)?,
-            CompressionCodec::Zstd => decompress_zstd(input, 
decompressed_size)?,
+            CompressionCodec::Zstd => decompress_zstd(input, 
decompressed_size, context)?,
         };
         if ret.len() != decompressed_size {
             return Err(ArrowError::IpcError(format!(
@@ -239,16 +285,22 @@ fn compress_zstd(
 }
 
 #[cfg(feature = "zstd")]
-fn decompress_zstd(input: &[u8], decompressed_size: usize) -> Result<Vec<u8>, 
ArrowError> {
-    use std::io::Read;
-    let mut output = Vec::with_capacity(decompressed_size);
-    zstd::Decoder::with_buffer(input)?.read_to_end(&mut output)?;
+fn decompress_zstd(
+    input: &[u8],
+    decompressed_size: usize,
+    context: &mut DecompressionContext,
+) -> Result<Vec<u8>, ArrowError> {
+    let output = context.decompressor.decompress(input, decompressed_size)?;
     Ok(output)
 }
 
 #[cfg(not(feature = "zstd"))]
 #[allow(clippy::ptr_arg)]
-fn decompress_zstd(_input: &[u8], _decompressed_size: usize) -> 
Result<Vec<u8>, ArrowError> {
+fn decompress_zstd(
+    _input: &[u8],
+    _decompressed_size: usize,
+    _context: &mut DecompressionContext,
+) -> Result<Vec<u8>, ArrowError> {
     Err(ArrowError::InvalidArgumentError(
         "zstd IPC decompression requires the zstd feature".to_string(),
     ))
@@ -278,7 +330,11 @@ mod tests {
             .compress(input_bytes, &mut output_bytes, &mut Default::default())
             .unwrap();
         let result = codec
-            .decompress(output_bytes.as_slice(), input_bytes.len())
+            .decompress(
+                output_bytes.as_slice(),
+                input_bytes.len(),
+                &mut Default::default(),
+            )
             .unwrap();
         assert_eq!(input_bytes, result.as_slice());
     }
@@ -293,7 +349,11 @@ mod tests {
             .compress(input_bytes, &mut output_bytes, &mut Default::default())
             .unwrap();
         let result = codec
-            .decompress(output_bytes.as_slice(), input_bytes.len())
+            .decompress(
+                output_bytes.as_slice(),
+                input_bytes.len(),
+                &mut Default::default(),
+            )
             .unwrap();
         assert_eq!(input_bytes, result.as_slice());
     }
diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs
index d5cda3e4e5..6a5dc707d7 100644
--- a/arrow-ipc/src/reader.rs
+++ b/arrow-ipc/src/reader.rs
@@ -42,7 +42,7 @@ use arrow_buffer::{
 use arrow_data::{ArrayData, ArrayDataBuilder, UnsafeFlag};
 use arrow_schema::*;
 
-use crate::compression::CompressionCodec;
+use crate::compression::{CompressionCodec, DecompressionContext};
 use crate::r#gen::Message::{self};
 use crate::{Block, CONTINUATION_MARKER, FieldNode, MetadataVersion};
 use DataType::*;
@@ -60,13 +60,16 @@ fn read_buffer(
     buf: &crate::Buffer,
     a_data: &Buffer,
     compression_codec: Option<CompressionCodec>,
+    decompression_context: &mut DecompressionContext,
 ) -> Result<Buffer, ArrowError> {
     let start_offset = buf.offset() as usize;
     let buf_data = a_data.slice_with_length(start_offset, buf.length() as 
usize);
     // corner case: empty buffer
     match (buf_data.is_empty(), compression_codec) {
         (true, _) | (_, None) => Ok(buf_data),
-        (false, Some(decompressor)) => 
decompressor.decompress_to_buffer(&buf_data),
+        (false, Some(decompressor)) => {
+            decompressor.decompress_to_buffer(&buf_data, decompression_context)
+        }
     }
 }
 impl RecordBatchDecoder<'_> {
@@ -444,6 +447,8 @@ pub struct RecordBatchDecoder<'a> {
     dictionaries_by_id: &'a HashMap<i64, ArrayRef>,
     /// Optional compression codec
     compression: Option<CompressionCodec>,
+    /// Decompression context for reusing zstd decompressor state
+    decompression_context: DecompressionContext,
     /// The format version
     version: MetadataVersion,
     /// The raw data buffer
@@ -490,6 +495,7 @@ impl<'a> RecordBatchDecoder<'a> {
             schema,
             dictionaries_by_id,
             compression,
+            decompression_context: DecompressionContext::new(),
             version: *metadata,
             data: buf,
             nodes: field_nodes.iter(),
@@ -606,7 +612,12 @@ impl<'a> RecordBatchDecoder<'a> {
         let buffer = self.buffers.next().ok_or_else(|| {
             ArrowError::IpcError("Buffer count mismatched with 
metadata".to_string())
         })?;
-        read_buffer(buffer, self.data, self.compression)
+        read_buffer(
+            buffer,
+            self.data,
+            self.compression,
+            &mut self.decompression_context,
+        )
     }
 
     fn skip_buffer(&mut self) {
diff --git a/parquet/src/compression.rs b/parquet/src/compression.rs
index 530838955c..fe2fb59c5b 100644
--- a/parquet/src/compression.rs
+++ b/parquet/src/compression.rs
@@ -503,20 +503,27 @@ pub use lz4_codec::*;
 
 #[cfg(any(feature = "zstd", test))]
 mod zstd_codec {
-    use std::io::{self, Write};
-
     use crate::compression::{Codec, ZstdLevel};
     use crate::errors::Result;
 
     /// Codec for Zstandard compression algorithm.
+    ///
+    /// Uses `zstd::bulk` API with reusable compressor/decompressor contexts
+    /// to avoid the overhead of reinitializing contexts for each operation.
     pub struct ZSTDCodec {
-        level: ZstdLevel,
+        compressor: zstd::bulk::Compressor<'static>,
+        decompressor: zstd::bulk::Decompressor<'static>,
     }
 
     impl ZSTDCodec {
         /// Creates new Zstandard compression codec.
         pub(crate) fn new(level: ZstdLevel) -> Self {
-            Self { level }
+            Self {
+                compressor: 
zstd::bulk::Compressor::new(level.compression_level())
+                    .expect("valid zstd compression level"),
+                decompressor: zstd::bulk::Decompressor::new()
+                    .expect("can create zstd decompressor"),
+            }
         }
     }
 
@@ -525,22 +532,25 @@ mod zstd_codec {
             &mut self,
             input_buf: &[u8],
             output_buf: &mut Vec<u8>,
-            _uncompress_size: Option<usize>,
+            uncompress_size: Option<usize>,
         ) -> Result<usize> {
-            let mut decoder = zstd::Decoder::new(input_buf)?;
-            match io::copy(&mut decoder, output_buf) {
-                Ok(n) => Ok(n as usize),
-                Err(e) => Err(e.into()),
-            }
+            let capacity = uncompress_size.unwrap_or_else(|| {
+                // Get the decompressed size from the zstd frame header
+                zstd::zstd_safe::get_frame_content_size(input_buf)
+                    .ok()
+                    .flatten()
+                    .unwrap_or(input_buf.len() as u64 * 4) as usize
+            });
+            let decompressed = self.decompressor.decompress(input_buf, 
capacity)?;
+            let len = decompressed.len();
+            output_buf.extend_from_slice(&decompressed);
+            Ok(len)
         }
 
         fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> 
Result<()> {
-            let mut encoder = zstd::Encoder::new(output_buf, self.level.0)?;
-            encoder.write_all(input_buf)?;
-            match encoder.finish() {
-                Ok(_) => Ok(()),
-                Err(e) => Err(e.into()),
-            }
+            let compressed = self.compressor.compress(input_buf)?;
+            output_buf.extend_from_slice(&compressed);
+            Ok(())
         }
     }
 }

Reply via email to