This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 3b0ede4fbb Replace lz4 with lz4_flex Allowing Compilation for WASM
(#4884)
3b0ede4fbb is described below
commit 3b0ede4fbb112b0d45d0ae3f03d8fc42c3ead631
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon Oct 2 11:33:25 2023 +0100
Replace lz4 with lz4_flex Allowing Compilation for WASM (#4884)
* Use lz4_flex
* Fix features
* Install clang for zlib
* Update arrow-ipc
* Fix CI
* Use LZ4F
* Support LZ4F fallback
* Restore support for LZ4F compressed CSV
* Clippy
* Fix features
* Add benchmark
* Additional system dependencies
---
.github/workflows/parquet.yml | 6 ++-
arrow-integration-test/src/lib.rs | 3 +-
arrow-ipc/Cargo.toml | 6 ++-
arrow-ipc/src/compression.rs | 78 ++++++++++++++++++----------
parquet/Cargo.toml | 11 +++-
parquet/benches/compression.rs | 101 +++++++++++++++++++++++++++++++++++++
parquet/src/bin/parquet-fromcsv.rs | 20 ++------
parquet/src/compression.rs | 36 ++++++-------
8 files changed, 191 insertions(+), 70 deletions(-)
diff --git a/.github/workflows/parquet.yml b/.github/workflows/parquet.yml
index c309a3fa64..7a649e16b1 100644
--- a/.github/workflows/parquet.yml
+++ b/.github/workflows/parquet.yml
@@ -123,10 +123,12 @@ jobs:
uses: ./.github/actions/setup-builder
with:
target: wasm32-unknown-unknown,wasm32-wasi
+ - name: Install clang # Needed for zlib compilation
+ run: apt-get update && apt-get install -y clang gcc-multilib
- name: Build wasm32-unknown-unknown
- run: cargo build -p parquet --no-default-features --features
cli,snap,flate2,brotli --target wasm32-unknown-unknown
+ run: cargo build -p parquet --target wasm32-unknown-unknown
- name: Build wasm32-wasi
- run: cargo build -p parquet --no-default-features --features
cli,snap,flate2,brotli --target wasm32-wasi
+ run: cargo build -p parquet --target wasm32-wasi
pyspark-integration-test:
name: PySpark Integration Test
diff --git a/arrow-integration-test/src/lib.rs
b/arrow-integration-test/src/lib.rs
index 04bbcf3f6f..07b69bffd0 100644
--- a/arrow-integration-test/src/lib.rs
+++ b/arrow-integration-test/src/lib.rs
@@ -183,7 +183,8 @@ impl ArrowJson {
return Ok(false);
}
}
- _ => return Ok(false),
+ Some(Err(e)) => return Err(e),
+ None => return Ok(false),
}
}
diff --git a/arrow-ipc/Cargo.toml b/arrow-ipc/Cargo.toml
index a03f53d664..b5f66294a7 100644
--- a/arrow-ipc/Cargo.toml
+++ b/arrow-ipc/Cargo.toml
@@ -40,8 +40,12 @@ arrow-cast = { workspace = true }
arrow-data = { workspace = true }
arrow-schema = { workspace = true }
flatbuffers = { version = "23.1.21", default-features = false }
-lz4 = { version = "1.23", default-features = false, optional = true }
+lz4_flex = { version = "0.11", default-features = false, features = ["std",
"frame"], optional = true }
zstd = { version = "0.12.0", default-features = false, optional = true }
+[features]
+default = []
+lz4 = ["lz4_flex"]
+
[dev-dependencies]
tempfile = "3.3"
diff --git a/arrow-ipc/src/compression.rs b/arrow-ipc/src/compression.rs
index db05e9a6a6..fafc2c5c9b 100644
--- a/arrow-ipc/src/compression.rs
+++ b/arrow-ipc/src/compression.rs
@@ -103,13 +103,15 @@ impl CompressionCodec {
} else if decompressed_length == LENGTH_NO_COMPRESSED_DATA {
// no compression
input.slice(LENGTH_OF_PREFIX_DATA as usize)
- } else {
+ } else if let Ok(decompressed_length) =
usize::try_from(decompressed_length) {
// decompress data using the codec
- let mut uncompressed_buffer =
- Vec::with_capacity(decompressed_length as usize);
let input_data = &input[(LENGTH_OF_PREFIX_DATA as usize)..];
- self.decompress(input_data, &mut uncompressed_buffer)?;
- Buffer::from(uncompressed_buffer)
+ self.decompress(input_data, decompressed_length as _)?
+ .into()
+ } else {
+ return Err(ArrowError::IpcError(format!(
+ "Invalid uncompressed length: {decompressed_length}"
+ )));
};
Ok(buffer)
}
@@ -128,21 +130,30 @@ impl CompressionCodec {
fn decompress(
&self,
input: &[u8],
- output: &mut Vec<u8>,
- ) -> Result<usize, ArrowError> {
- match self {
- CompressionCodec::Lz4Frame => decompress_lz4(input, output),
- CompressionCodec::Zstd => decompress_zstd(input, output),
+ decompressed_size: usize,
+ ) -> Result<Vec<u8>, ArrowError> {
+ let ret = match self {
+ CompressionCodec::Lz4Frame => decompress_lz4(input,
decompressed_size)?,
+ CompressionCodec::Zstd => decompress_zstd(input,
decompressed_size)?,
+ };
+ if ret.len() != decompressed_size {
+ return Err(ArrowError::IpcError(format!(
+ "Expected compressed length of {decompressed_size} got {}",
+ ret.len()
+ )));
}
+ Ok(ret)
}
}
#[cfg(feature = "lz4")]
fn compress_lz4(input: &[u8], output: &mut Vec<u8>) -> Result<(), ArrowError> {
use std::io::Write;
- let mut encoder = lz4::EncoderBuilder::new().build(output)?;
+ let mut encoder = lz4_flex::frame::FrameEncoder::new(output);
encoder.write_all(input)?;
- encoder.finish().1?;
+ encoder
+ .finish()
+ .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
Ok(())
}
@@ -155,14 +166,19 @@ fn compress_lz4(_input: &[u8], _output: &mut Vec<u8>) ->
Result<(), ArrowError>
}
#[cfg(feature = "lz4")]
-fn decompress_lz4(input: &[u8], output: &mut Vec<u8>) -> Result<usize,
ArrowError> {
+fn decompress_lz4(input: &[u8], decompressed_size: usize) -> Result<Vec<u8>,
ArrowError> {
use std::io::Read;
- Ok(lz4::Decoder::new(input)?.read_to_end(output)?)
+ let mut output = Vec::with_capacity(decompressed_size);
+ lz4_flex::frame::FrameDecoder::new(input).read_to_end(&mut output)?;
+ Ok(output)
}
#[cfg(not(feature = "lz4"))]
#[allow(clippy::ptr_arg)]
-fn decompress_lz4(_input: &[u8], _output: &mut Vec<u8>) -> Result<usize,
ArrowError> {
+fn decompress_lz4(
+ _input: &[u8],
+ _decompressed_size: usize,
+) -> Result<Vec<u8>, ArrowError> {
Err(ArrowError::InvalidArgumentError(
"lz4 IPC decompression requires the lz4 feature".to_string(),
))
@@ -186,14 +202,22 @@ fn compress_zstd(_input: &[u8], _output: &mut Vec<u8>) ->
Result<(), ArrowError>
}
#[cfg(feature = "zstd")]
-fn decompress_zstd(input: &[u8], output: &mut Vec<u8>) -> Result<usize,
ArrowError> {
+fn decompress_zstd(
+ input: &[u8],
+ decompressed_size: usize,
+) -> Result<Vec<u8>, ArrowError> {
use std::io::Read;
- Ok(zstd::Decoder::new(input)?.read_to_end(output)?)
+ let mut output = Vec::with_capacity(decompressed_size);
+ zstd::Decoder::with_buffer(input)?.read_to_end(&mut output)?;
+ Ok(output)
}
#[cfg(not(feature = "zstd"))]
#[allow(clippy::ptr_arg)]
-fn decompress_zstd(_input: &[u8], _output: &mut Vec<u8>) -> Result<usize,
ArrowError> {
+fn decompress_zstd(
+ _input: &[u8],
+ _decompressed_size: usize,
+) -> Result<Vec<u8>, ArrowError> {
Err(ArrowError::InvalidArgumentError(
"zstd IPC decompression requires the zstd feature".to_string(),
))
@@ -216,28 +240,26 @@ mod tests {
#[test]
#[cfg(feature = "lz4")]
fn test_lz4_compression() {
- let input_bytes = "hello lz4".as_bytes();
+ let input_bytes = b"hello lz4";
let codec = super::CompressionCodec::Lz4Frame;
let mut output_bytes: Vec<u8> = Vec::new();
codec.compress(input_bytes, &mut output_bytes).unwrap();
- let mut result_output_bytes: Vec<u8> = Vec::new();
- codec
- .decompress(output_bytes.as_slice(), &mut result_output_bytes)
+ let result = codec
+ .decompress(output_bytes.as_slice(), input_bytes.len())
.unwrap();
- assert_eq!(input_bytes, result_output_bytes.as_slice());
+ assert_eq!(input_bytes, result.as_slice());
}
#[test]
#[cfg(feature = "zstd")]
fn test_zstd_compression() {
- let input_bytes = "hello zstd".as_bytes();
+ let input_bytes = b"hello zstd";
let codec = super::CompressionCodec::Zstd;
let mut output_bytes: Vec<u8> = Vec::new();
codec.compress(input_bytes, &mut output_bytes).unwrap();
- let mut result_output_bytes: Vec<u8> = Vec::new();
- codec
- .decompress(output_bytes.as_slice(), &mut result_output_bytes)
+ let result = codec
+ .decompress(output_bytes.as_slice(), input_bytes.len())
.unwrap();
- assert_eq!(input_bytes, result_output_bytes.as_slice());
+ assert_eq!(input_bytes, result.as_slice());
}
}
diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index 7c346248ac..c710c83213 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -51,7 +51,7 @@ thrift = { version = "0.17", default-features = false }
snap = { version = "1.0", default-features = false, optional = true }
brotli = { version = "3.3", default-features = false, features = ["std"],
optional = true }
flate2 = { version = "1.0", default-features = false, features =
["rust_backend"], optional = true }
-lz4 = { version = "1.23", default-features = false, optional = true }
+lz4_flex = { version = "0.11", default-features = false, features = ["std",
"frame"], optional = true }
zstd = { version = "0.12.0", optional = true, default-features = false }
chrono = { workspace = true }
num = { version = "0.4", default-features = false }
@@ -74,7 +74,7 @@ snap = { version = "1.0", default-features = false }
tempfile = { version = "3.0", default-features = false }
brotli = { version = "3.3", default-features = false, features = ["std"] }
flate2 = { version = "1.0", default-features = false, features =
["rust_backend"] }
-lz4 = { version = "1.23", default-features = false }
+lz4_flex = { version = "0.11", default-features = false, features = ["std",
"frame"] }
zstd = { version = "0.12", default-features = false }
serde_json = { version = "1.0", features = ["std"], default-features = false }
arrow = { workspace = true, features = ["ipc", "test_utils", "prettyprint",
"json"] }
@@ -86,6 +86,8 @@ all-features = true
[features]
default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"]
+# Enable lz4
+lz4 = ["lz4_flex"]
# Enable arrow reader/writer APIs
arrow = ["base64", "arrow-array", "arrow-buffer", "arrow-cast", "arrow-data",
"arrow-schema", "arrow-select", "arrow-ipc"]
# Enable CLI tools
@@ -166,5 +168,10 @@ name = "arrow_reader"
required-features = ["arrow", "test_common", "experimental"]
harness = false
+[[bench]]
+name = "compression"
+required-features = ["experimental", "default"]
+harness = false
+
[lib]
bench = false
diff --git a/parquet/benches/compression.rs b/parquet/benches/compression.rs
new file mode 100644
index 0000000000..ce4f9aead7
--- /dev/null
+++ b/parquet/benches/compression.rs
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use criterion::*;
+use parquet::basic::{BrotliLevel, Compression, GzipLevel, ZstdLevel};
+use parquet::compression::create_codec;
+use rand::distributions::Alphanumeric;
+use rand::prelude::*;
+
+fn do_bench(c: &mut Criterion, name: &str, uncompressed: &[u8]) {
+ let codecs = [
+ Compression::BROTLI(BrotliLevel::default()),
+ Compression::GZIP(GzipLevel::default()),
+ Compression::LZ4,
+ Compression::LZ4_RAW,
+ Compression::SNAPPY,
+ Compression::GZIP(GzipLevel::default()),
+ Compression::ZSTD(ZstdLevel::default()),
+ ];
+
+ for compression in codecs {
+ let mut codec = create_codec(compression, &Default::default())
+ .unwrap()
+ .unwrap();
+
+ c.bench_function(&format!("compress {compression} - {name}"), |b| {
+ b.iter(|| {
+ let mut out = Vec::new();
+ codec.compress(uncompressed, &mut out).unwrap();
+ out
+ });
+ });
+
+ let mut compressed = Vec::new();
+ codec.compress(uncompressed, &mut compressed).unwrap();
+ println!(
+ "{compression} compressed {} bytes of {name} to {} bytes",
+ uncompressed.len(),
+ compressed.len()
+ );
+
+ c.bench_function(&format!("decompress {compression} - {name}"), |b| {
+ b.iter(|| {
+ let mut out = Vec::new();
+ codec
+ .decompress(
+ black_box(&compressed),
+ &mut out,
+ Some(uncompressed.len()),
+ )
+ .unwrap();
+ out
+ });
+ });
+ }
+}
+
+fn criterion_benchmark(c: &mut Criterion) {
+ let mut rng = StdRng::seed_from_u64(42);
+ let rng = &mut rng;
+ const DATA_SIZE: usize = 1024 * 1024;
+
+ let uncompressed: Vec<_> =
rng.sample_iter(&Alphanumeric).take(DATA_SIZE).collect();
+ do_bench(c, "alphanumeric", &uncompressed);
+
+ // Create a collection of 64 words
+ let words: Vec<Vec<_>> = (0..64)
+ .map(|_| {
+ let len = rng.gen_range(1..12);
+ rng.sample_iter(&Alphanumeric).take(len).collect()
+ })
+ .collect();
+
+ // Build data by concatenating these words randomly together
+ let mut uncompressed = Vec::with_capacity(DATA_SIZE);
+ while uncompressed.len() < DATA_SIZE {
+ let word = &words[rng.gen_range(0..words.len())];
+ uncompressed
+ .extend_from_slice(&word[..word.len().min(DATA_SIZE -
uncompressed.len())])
+ }
+ assert_eq!(uncompressed.len(), DATA_SIZE);
+
+ do_bench(c, "words", &uncompressed);
+}
+
+criterion_group!(benches, criterion_benchmark);
+criterion_main!(benches);
diff --git a/parquet/src/bin/parquet-fromcsv.rs
b/parquet/src/bin/parquet-fromcsv.rs
index 1ff6fecf5a..548bbdbfb8 100644
--- a/parquet/src/bin/parquet-fromcsv.rs
+++ b/parquet/src/bin/parquet-fromcsv.rs
@@ -386,9 +386,9 @@ fn convert_csv_to_parquet(args: &Args) -> Result<(),
ParquetFromCsvError> {
Compression::BROTLI(_) => {
Box::new(brotli::Decompressor::new(input_file, 0)) as Box<dyn Read>
}
- Compression::LZ4 => Box::new(lz4::Decoder::new(input_file).map_err(|e|
{
- ParquetFromCsvError::with_context(e, "Failed to create
lz4::Decoder")
- })?) as Box<dyn Read>,
+ Compression::LZ4 => {
+ Box::new(lz4_flex::frame::FrameDecoder::new(input_file)) as
Box<dyn Read>
+ }
Compression::ZSTD(_) =>
Box::new(zstd::Decoder::new(input_file).map_err(|e| {
ParquetFromCsvError::with_context(e, "Failed to create
zstd::Decoder")
})?) as Box<dyn Read>,
@@ -692,19 +692,9 @@ mod tests {
encoder.into_inner()
}
Compression::LZ4 => {
- let mut encoder = lz4::EncoderBuilder::new()
- .build(input_file)
- .map_err(|e| {
- ParquetFromCsvError::with_context(
- e,
- "Failed to create lz4::Encoder",
- )
- })
- .unwrap();
+ let mut encoder =
lz4_flex::frame::FrameEncoder::new(input_file);
write_tmp_file(&mut encoder);
- let (inner, err) = encoder.finish();
- err.unwrap();
- inner
+ encoder.finish().unwrap()
}
Compression::ZSTD(level) => {
diff --git a/parquet/src/compression.rs b/parquet/src/compression.rs
index f1831ed484..9e0eee0e3e 100644
--- a/parquet/src/compression.rs
+++ b/parquet/src/compression.rs
@@ -388,7 +388,7 @@ mod lz4_codec {
use std::io::{Read, Write};
use crate::compression::Codec;
- use crate::errors::Result;
+ use crate::errors::{ParquetError, Result};
const LZ4_BUFFER_SIZE: usize = 4096;
@@ -409,7 +409,7 @@ mod lz4_codec {
output_buf: &mut Vec<u8>,
_uncompress_size: Option<usize>,
) -> Result<usize> {
- let mut decoder = lz4::Decoder::new(input_buf)?;
+ let mut decoder = lz4_flex::frame::FrameDecoder::new(input_buf);
let mut buffer: [u8; LZ4_BUFFER_SIZE] = [0; LZ4_BUFFER_SIZE];
let mut total_len = 0;
loop {
@@ -424,7 +424,7 @@ mod lz4_codec {
}
fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) ->
Result<()> {
- let mut encoder = lz4::EncoderBuilder::new().build(output_buf)?;
+ let mut encoder = lz4_flex::frame::FrameEncoder::new(output_buf);
let mut from = 0;
loop {
let to = std::cmp::min(from + LZ4_BUFFER_SIZE,
input_buf.len());
@@ -434,7 +434,10 @@ mod lz4_codec {
break;
}
}
- encoder.finish().1.map_err(|e| e.into())
+ match encoder.finish() {
+ Ok(_) => Ok(()),
+ Err(e) => Err(ParquetError::External(Box::new(e))),
+ }
}
}
}
@@ -551,11 +554,7 @@ mod lz4_raw_codec {
}
};
output_buf.resize(offset + required_len, 0);
- match lz4::block::decompress_to_buffer(
- input_buf,
- Some(required_len.try_into().unwrap()),
- &mut output_buf[offset..],
- ) {
+ match lz4_flex::block::decompress_into(input_buf, &mut
output_buf[offset..]) {
Ok(n) => {
if n != required_len {
return Err(ParquetError::General(
@@ -564,25 +563,20 @@ mod lz4_raw_codec {
}
Ok(n)
}
- Err(e) => Err(e.into()),
+ Err(e) => Err(ParquetError::External(Box::new(e))),
}
}
fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) ->
Result<()> {
let offset = output_buf.len();
- let required_len = lz4::block::compress_bound(input_buf.len())?;
+ let required_len =
lz4_flex::block::get_maximum_output_size(input_buf.len());
output_buf.resize(offset + required_len, 0);
- match lz4::block::compress_to_buffer(
- input_buf,
- None,
- false,
- &mut output_buf[offset..],
- ) {
+ match lz4_flex::block::compress_into(input_buf, &mut
output_buf[offset..]) {
Ok(n) => {
output_buf.truncate(offset + n);
Ok(())
}
- Err(e) => Err(e.into()),
+ Err(e) => Err(ParquetError::External(Box::new(e))),
}
}
}
@@ -666,11 +660,11 @@ mod lz4_hadoop_codec {
"Not enough bytes to hold advertised output",
));
}
- let decompressed_size = lz4::block::decompress_to_buffer(
+ let decompressed_size = lz4_flex::decompress_into(
&input[..expected_compressed_size as usize],
- Some(output_len as i32),
output,
- )?;
+ )
+ .map_err(|e| ParquetError::External(Box::new(e)))?;
if decompressed_size != expected_decompressed_size as usize {
return Err(io::Error::new(
io::ErrorKind::Other,