This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b0ede4fbb Replace lz4 with lz4_flex Allowing Compilation for WASM 
(#4884)
3b0ede4fbb is described below

commit 3b0ede4fbb112b0d45d0ae3f03d8fc42c3ead631
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon Oct 2 11:33:25 2023 +0100

    Replace lz4 with lz4_flex Allowing Compilation for WASM (#4884)
    
    * Use lz4_flex
    
    * Fix features
    
    * Install clang for zlib
    
    * Update arrow-ipc
    
    * Fix CI
    
    * Use LZ4F
    
    * Support LZ4F fallback
    
    * Restore support for LZ4F compressed CSV
    
    * Clippy
    
    * Fix features
    
    * Add benchmark
    
    * Additional system dependencies
---
 .github/workflows/parquet.yml      |   6 ++-
 arrow-integration-test/src/lib.rs  |   3 +-
 arrow-ipc/Cargo.toml               |   6 ++-
 arrow-ipc/src/compression.rs       |  78 ++++++++++++++++++----------
 parquet/Cargo.toml                 |  11 +++-
 parquet/benches/compression.rs     | 101 +++++++++++++++++++++++++++++++++++++
 parquet/src/bin/parquet-fromcsv.rs |  20 ++------
 parquet/src/compression.rs         |  36 ++++++-------
 8 files changed, 191 insertions(+), 70 deletions(-)

diff --git a/.github/workflows/parquet.yml b/.github/workflows/parquet.yml
index c309a3fa64..7a649e16b1 100644
--- a/.github/workflows/parquet.yml
+++ b/.github/workflows/parquet.yml
@@ -123,10 +123,12 @@ jobs:
         uses: ./.github/actions/setup-builder
         with:
           target: wasm32-unknown-unknown,wasm32-wasi
+      - name: Install clang # Needed for zlib compilation
+        run: apt-get update && apt-get install -y clang gcc-multilib
       - name: Build wasm32-unknown-unknown
-        run: cargo build -p parquet --no-default-features --features 
cli,snap,flate2,brotli --target wasm32-unknown-unknown
+        run: cargo build -p parquet --target wasm32-unknown-unknown
       - name: Build wasm32-wasi
-        run: cargo build -p parquet --no-default-features --features 
cli,snap,flate2,brotli --target wasm32-wasi
+        run: cargo build -p parquet --target wasm32-wasi
 
   pyspark-integration-test:
     name: PySpark Integration Test
diff --git a/arrow-integration-test/src/lib.rs 
b/arrow-integration-test/src/lib.rs
index 04bbcf3f6f..07b69bffd0 100644
--- a/arrow-integration-test/src/lib.rs
+++ b/arrow-integration-test/src/lib.rs
@@ -183,7 +183,8 @@ impl ArrowJson {
                         return Ok(false);
                     }
                 }
-                _ => return Ok(false),
+                Some(Err(e)) => return Err(e),
+                None => return Ok(false),
             }
         }
 
diff --git a/arrow-ipc/Cargo.toml b/arrow-ipc/Cargo.toml
index a03f53d664..b5f66294a7 100644
--- a/arrow-ipc/Cargo.toml
+++ b/arrow-ipc/Cargo.toml
@@ -40,8 +40,12 @@ arrow-cast = { workspace = true }
 arrow-data = { workspace = true }
 arrow-schema = { workspace = true }
 flatbuffers = { version = "23.1.21", default-features = false }
-lz4 = { version = "1.23", default-features = false, optional = true }
+lz4_flex = { version = "0.11", default-features = false, features = ["std", 
"frame"], optional = true }
 zstd = { version = "0.12.0", default-features = false, optional = true }
 
+[features]
+default = []
+lz4 = ["lz4_flex"]
+
 [dev-dependencies]
 tempfile = "3.3"
diff --git a/arrow-ipc/src/compression.rs b/arrow-ipc/src/compression.rs
index db05e9a6a6..fafc2c5c9b 100644
--- a/arrow-ipc/src/compression.rs
+++ b/arrow-ipc/src/compression.rs
@@ -103,13 +103,15 @@ impl CompressionCodec {
         } else if decompressed_length == LENGTH_NO_COMPRESSED_DATA {
             // no compression
             input.slice(LENGTH_OF_PREFIX_DATA as usize)
-        } else {
+        } else if let Ok(decompressed_length) = 
usize::try_from(decompressed_length) {
             // decompress data using the codec
-            let mut uncompressed_buffer =
-                Vec::with_capacity(decompressed_length as usize);
             let input_data = &input[(LENGTH_OF_PREFIX_DATA as usize)..];
-            self.decompress(input_data, &mut uncompressed_buffer)?;
-            Buffer::from(uncompressed_buffer)
+            self.decompress(input_data, decompressed_length as _)?
+                .into()
+        } else {
+            return Err(ArrowError::IpcError(format!(
+                "Invalid uncompressed length: {decompressed_length}"
+            )));
         };
         Ok(buffer)
     }
@@ -128,21 +130,30 @@ impl CompressionCodec {
     fn decompress(
         &self,
         input: &[u8],
-        output: &mut Vec<u8>,
-    ) -> Result<usize, ArrowError> {
-        match self {
-            CompressionCodec::Lz4Frame => decompress_lz4(input, output),
-            CompressionCodec::Zstd => decompress_zstd(input, output),
+        decompressed_size: usize,
+    ) -> Result<Vec<u8>, ArrowError> {
+        let ret = match self {
+            CompressionCodec::Lz4Frame => decompress_lz4(input, 
decompressed_size)?,
+            CompressionCodec::Zstd => decompress_zstd(input, 
decompressed_size)?,
+        };
+        if ret.len() != decompressed_size {
+            return Err(ArrowError::IpcError(format!(
+                "Expected compressed length of {decompressed_size} got {}",
+                ret.len()
+            )));
         }
+        Ok(ret)
     }
 }
 
 #[cfg(feature = "lz4")]
 fn compress_lz4(input: &[u8], output: &mut Vec<u8>) -> Result<(), ArrowError> {
     use std::io::Write;
-    let mut encoder = lz4::EncoderBuilder::new().build(output)?;
+    let mut encoder = lz4_flex::frame::FrameEncoder::new(output);
     encoder.write_all(input)?;
-    encoder.finish().1?;
+    encoder
+        .finish()
+        .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
     Ok(())
 }
 
@@ -155,14 +166,19 @@ fn compress_lz4(_input: &[u8], _output: &mut Vec<u8>) -> 
Result<(), ArrowError>
 }
 
 #[cfg(feature = "lz4")]
-fn decompress_lz4(input: &[u8], output: &mut Vec<u8>) -> Result<usize, 
ArrowError> {
+fn decompress_lz4(input: &[u8], decompressed_size: usize) -> Result<Vec<u8>, 
ArrowError> {
     use std::io::Read;
-    Ok(lz4::Decoder::new(input)?.read_to_end(output)?)
+    let mut output = Vec::with_capacity(decompressed_size);
+    lz4_flex::frame::FrameDecoder::new(input).read_to_end(&mut output)?;
+    Ok(output)
 }
 
 #[cfg(not(feature = "lz4"))]
 #[allow(clippy::ptr_arg)]
-fn decompress_lz4(_input: &[u8], _output: &mut Vec<u8>) -> Result<usize, 
ArrowError> {
+fn decompress_lz4(
+    _input: &[u8],
+    _decompressed_size: usize,
+) -> Result<Vec<u8>, ArrowError> {
     Err(ArrowError::InvalidArgumentError(
         "lz4 IPC decompression requires the lz4 feature".to_string(),
     ))
@@ -186,14 +202,22 @@ fn compress_zstd(_input: &[u8], _output: &mut Vec<u8>) -> 
Result<(), ArrowError>
 }
 
 #[cfg(feature = "zstd")]
-fn decompress_zstd(input: &[u8], output: &mut Vec<u8>) -> Result<usize, 
ArrowError> {
+fn decompress_zstd(
+    input: &[u8],
+    decompressed_size: usize,
+) -> Result<Vec<u8>, ArrowError> {
     use std::io::Read;
-    Ok(zstd::Decoder::new(input)?.read_to_end(output)?)
+    let mut output = Vec::with_capacity(decompressed_size);
+    zstd::Decoder::with_buffer(input)?.read_to_end(&mut output)?;
+    Ok(output)
 }
 
 #[cfg(not(feature = "zstd"))]
 #[allow(clippy::ptr_arg)]
-fn decompress_zstd(_input: &[u8], _output: &mut Vec<u8>) -> Result<usize, 
ArrowError> {
+fn decompress_zstd(
+    _input: &[u8],
+    _decompressed_size: usize,
+) -> Result<Vec<u8>, ArrowError> {
     Err(ArrowError::InvalidArgumentError(
         "zstd IPC decompression requires the zstd feature".to_string(),
     ))
@@ -216,28 +240,26 @@ mod tests {
     #[test]
     #[cfg(feature = "lz4")]
     fn test_lz4_compression() {
-        let input_bytes = "hello lz4".as_bytes();
+        let input_bytes = b"hello lz4";
         let codec = super::CompressionCodec::Lz4Frame;
         let mut output_bytes: Vec<u8> = Vec::new();
         codec.compress(input_bytes, &mut output_bytes).unwrap();
-        let mut result_output_bytes: Vec<u8> = Vec::new();
-        codec
-            .decompress(output_bytes.as_slice(), &mut result_output_bytes)
+        let result = codec
+            .decompress(output_bytes.as_slice(), input_bytes.len())
             .unwrap();
-        assert_eq!(input_bytes, result_output_bytes.as_slice());
+        assert_eq!(input_bytes, result.as_slice());
     }
 
     #[test]
     #[cfg(feature = "zstd")]
     fn test_zstd_compression() {
-        let input_bytes = "hello zstd".as_bytes();
+        let input_bytes = b"hello zstd";
         let codec = super::CompressionCodec::Zstd;
         let mut output_bytes: Vec<u8> = Vec::new();
         codec.compress(input_bytes, &mut output_bytes).unwrap();
-        let mut result_output_bytes: Vec<u8> = Vec::new();
-        codec
-            .decompress(output_bytes.as_slice(), &mut result_output_bytes)
+        let result = codec
+            .decompress(output_bytes.as_slice(), input_bytes.len())
             .unwrap();
-        assert_eq!(input_bytes, result_output_bytes.as_slice());
+        assert_eq!(input_bytes, result.as_slice());
     }
 }
diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index 7c346248ac..c710c83213 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -51,7 +51,7 @@ thrift = { version = "0.17", default-features = false }
 snap = { version = "1.0", default-features = false, optional = true }
 brotli = { version = "3.3", default-features = false, features = ["std"], 
optional = true }
 flate2 = { version = "1.0", default-features = false, features = 
["rust_backend"], optional = true }
-lz4 = { version = "1.23", default-features = false, optional = true }
+lz4_flex = { version = "0.11", default-features = false, features = ["std", 
"frame"], optional = true }
 zstd = { version = "0.12.0", optional = true, default-features = false }
 chrono = { workspace = true }
 num = { version = "0.4", default-features = false }
@@ -74,7 +74,7 @@ snap = { version = "1.0", default-features = false }
 tempfile = { version = "3.0", default-features = false }
 brotli = { version = "3.3", default-features = false, features = ["std"] }
 flate2 = { version = "1.0", default-features = false, features = 
["rust_backend"] }
-lz4 = { version = "1.23", default-features = false }
+lz4_flex = { version = "0.11", default-features = false, features = ["std", 
"frame"] }
 zstd = { version = "0.12", default-features = false }
 serde_json = { version = "1.0", features = ["std"], default-features = false }
 arrow = { workspace = true, features = ["ipc", "test_utils", "prettyprint", 
"json"] }
@@ -86,6 +86,8 @@ all-features = true
 
 [features]
 default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"]
+# Enable lz4
+lz4 = ["lz4_flex"]
 # Enable arrow reader/writer APIs
 arrow = ["base64", "arrow-array", "arrow-buffer", "arrow-cast", "arrow-data", 
"arrow-schema", "arrow-select", "arrow-ipc"]
 # Enable CLI tools
@@ -166,5 +168,10 @@ name = "arrow_reader"
 required-features = ["arrow", "test_common", "experimental"]
 harness = false
 
+[[bench]]
+name = "compression"
+required-features = ["experimental", "default"]
+harness = false
+
 [lib]
 bench = false
diff --git a/parquet/benches/compression.rs b/parquet/benches/compression.rs
new file mode 100644
index 0000000000..ce4f9aead7
--- /dev/null
+++ b/parquet/benches/compression.rs
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use criterion::*;
+use parquet::basic::{BrotliLevel, Compression, GzipLevel, ZstdLevel};
+use parquet::compression::create_codec;
+use rand::distributions::Alphanumeric;
+use rand::prelude::*;
+
+fn do_bench(c: &mut Criterion, name: &str, uncompressed: &[u8]) {
+    let codecs = [
+        Compression::BROTLI(BrotliLevel::default()),
+        Compression::GZIP(GzipLevel::default()),
+        Compression::LZ4,
+        Compression::LZ4_RAW,
+        Compression::SNAPPY,
+        Compression::GZIP(GzipLevel::default()),
+        Compression::ZSTD(ZstdLevel::default()),
+    ];
+
+    for compression in codecs {
+        let mut codec = create_codec(compression, &Default::default())
+            .unwrap()
+            .unwrap();
+
+        c.bench_function(&format!("compress {compression} - {name}"), |b| {
+            b.iter(|| {
+                let mut out = Vec::new();
+                codec.compress(uncompressed, &mut out).unwrap();
+                out
+            });
+        });
+
+        let mut compressed = Vec::new();
+        codec.compress(uncompressed, &mut compressed).unwrap();
+        println!(
+            "{compression} compressed {} bytes of {name} to {} bytes",
+            uncompressed.len(),
+            compressed.len()
+        );
+
+        c.bench_function(&format!("decompress {compression} - {name}"), |b| {
+            b.iter(|| {
+                let mut out = Vec::new();
+                codec
+                    .decompress(
+                        black_box(&compressed),
+                        &mut out,
+                        Some(uncompressed.len()),
+                    )
+                    .unwrap();
+                out
+            });
+        });
+    }
+}
+
+fn criterion_benchmark(c: &mut Criterion) {
+    let mut rng = StdRng::seed_from_u64(42);
+    let rng = &mut rng;
+    const DATA_SIZE: usize = 1024 * 1024;
+
+    let uncompressed: Vec<_> = 
rng.sample_iter(&Alphanumeric).take(DATA_SIZE).collect();
+    do_bench(c, "alphanumeric", &uncompressed);
+
+    // Create a collection of 64 words
+    let words: Vec<Vec<_>> = (0..64)
+        .map(|_| {
+            let len = rng.gen_range(1..12);
+            rng.sample_iter(&Alphanumeric).take(len).collect()
+        })
+        .collect();
+
+    // Build data by concatenating these words randomly together
+    let mut uncompressed = Vec::with_capacity(DATA_SIZE);
+    while uncompressed.len() < DATA_SIZE {
+        let word = &words[rng.gen_range(0..words.len())];
+        uncompressed
+            .extend_from_slice(&word[..word.len().min(DATA_SIZE - 
uncompressed.len())])
+    }
+    assert_eq!(uncompressed.len(), DATA_SIZE);
+
+    do_bench(c, "words", &uncompressed);
+}
+
+criterion_group!(benches, criterion_benchmark);
+criterion_main!(benches);
diff --git a/parquet/src/bin/parquet-fromcsv.rs 
b/parquet/src/bin/parquet-fromcsv.rs
index 1ff6fecf5a..548bbdbfb8 100644
--- a/parquet/src/bin/parquet-fromcsv.rs
+++ b/parquet/src/bin/parquet-fromcsv.rs
@@ -386,9 +386,9 @@ fn convert_csv_to_parquet(args: &Args) -> Result<(), 
ParquetFromCsvError> {
         Compression::BROTLI(_) => {
             Box::new(brotli::Decompressor::new(input_file, 0)) as Box<dyn Read>
         }
-        Compression::LZ4 => Box::new(lz4::Decoder::new(input_file).map_err(|e| 
{
-            ParquetFromCsvError::with_context(e, "Failed to create 
lz4::Decoder")
-        })?) as Box<dyn Read>,
+        Compression::LZ4 => {
+            Box::new(lz4_flex::frame::FrameDecoder::new(input_file)) as 
Box<dyn Read>
+        }
         Compression::ZSTD(_) => 
Box::new(zstd::Decoder::new(input_file).map_err(|e| {
             ParquetFromCsvError::with_context(e, "Failed to create 
zstd::Decoder")
         })?) as Box<dyn Read>,
@@ -692,19 +692,9 @@ mod tests {
                 encoder.into_inner()
             }
             Compression::LZ4 => {
-                let mut encoder = lz4::EncoderBuilder::new()
-                    .build(input_file)
-                    .map_err(|e| {
-                        ParquetFromCsvError::with_context(
-                            e,
-                            "Failed to create lz4::Encoder",
-                        )
-                    })
-                    .unwrap();
+                let mut encoder = 
lz4_flex::frame::FrameEncoder::new(input_file);
                 write_tmp_file(&mut encoder);
-                let (inner, err) = encoder.finish();
-                err.unwrap();
-                inner
+                encoder.finish().unwrap()
             }
 
             Compression::ZSTD(level) => {
diff --git a/parquet/src/compression.rs b/parquet/src/compression.rs
index f1831ed484..9e0eee0e3e 100644
--- a/parquet/src/compression.rs
+++ b/parquet/src/compression.rs
@@ -388,7 +388,7 @@ mod lz4_codec {
     use std::io::{Read, Write};
 
     use crate::compression::Codec;
-    use crate::errors::Result;
+    use crate::errors::{ParquetError, Result};
 
     const LZ4_BUFFER_SIZE: usize = 4096;
 
@@ -409,7 +409,7 @@ mod lz4_codec {
             output_buf: &mut Vec<u8>,
             _uncompress_size: Option<usize>,
         ) -> Result<usize> {
-            let mut decoder = lz4::Decoder::new(input_buf)?;
+            let mut decoder = lz4_flex::frame::FrameDecoder::new(input_buf);
             let mut buffer: [u8; LZ4_BUFFER_SIZE] = [0; LZ4_BUFFER_SIZE];
             let mut total_len = 0;
             loop {
@@ -424,7 +424,7 @@ mod lz4_codec {
         }
 
         fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> 
Result<()> {
-            let mut encoder = lz4::EncoderBuilder::new().build(output_buf)?;
+            let mut encoder = lz4_flex::frame::FrameEncoder::new(output_buf);
             let mut from = 0;
             loop {
                 let to = std::cmp::min(from + LZ4_BUFFER_SIZE, 
input_buf.len());
@@ -434,7 +434,10 @@ mod lz4_codec {
                     break;
                 }
             }
-            encoder.finish().1.map_err(|e| e.into())
+            match encoder.finish() {
+                Ok(_) => Ok(()),
+                Err(e) => Err(ParquetError::External(Box::new(e))),
+            }
         }
     }
 }
@@ -551,11 +554,7 @@ mod lz4_raw_codec {
                 }
             };
             output_buf.resize(offset + required_len, 0);
-            match lz4::block::decompress_to_buffer(
-                input_buf,
-                Some(required_len.try_into().unwrap()),
-                &mut output_buf[offset..],
-            ) {
+            match lz4_flex::block::decompress_into(input_buf, &mut 
output_buf[offset..]) {
                 Ok(n) => {
                     if n != required_len {
                         return Err(ParquetError::General(
@@ -564,25 +563,20 @@ mod lz4_raw_codec {
                     }
                     Ok(n)
                 }
-                Err(e) => Err(e.into()),
+                Err(e) => Err(ParquetError::External(Box::new(e))),
             }
         }
 
         fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> 
Result<()> {
             let offset = output_buf.len();
-            let required_len = lz4::block::compress_bound(input_buf.len())?;
+            let required_len = 
lz4_flex::block::get_maximum_output_size(input_buf.len());
             output_buf.resize(offset + required_len, 0);
-            match lz4::block::compress_to_buffer(
-                input_buf,
-                None,
-                false,
-                &mut output_buf[offset..],
-            ) {
+            match lz4_flex::block::compress_into(input_buf, &mut 
output_buf[offset..]) {
                 Ok(n) => {
                     output_buf.truncate(offset + n);
                     Ok(())
                 }
-                Err(e) => Err(e.into()),
+                Err(e) => Err(ParquetError::External(Box::new(e))),
             }
         }
     }
@@ -666,11 +660,11 @@ mod lz4_hadoop_codec {
                     "Not enough bytes to hold advertised output",
                 ));
             }
-            let decompressed_size = lz4::block::decompress_to_buffer(
+            let decompressed_size = lz4_flex::decompress_into(
                 &input[..expected_compressed_size as usize],
-                Some(output_len as i32),
                 output,
-            )?;
+            )
+            .map_err(|e| ParquetError::External(Box::new(e)))?;
             if decompressed_size != expected_decompressed_size as usize {
                 return Err(io::Error::new(
                     io::ErrorKind::Other,

Reply via email to