This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 202e9a1c29 Avro block decompression (#5306)
202e9a1c29 is described below
commit 202e9a1c29ef93d0bf4c42ea8c4c3733b44721ab
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Wed Jan 17 16:23:42 2024 +0000
Avro block decompression (#5306)
* Avro block decompression
* Clippy
* Update arrow-avro/src/reader/mod.rs
Co-authored-by: Marco Neumann <[email protected]>
---------
Co-authored-by: Marco Neumann <[email protected]>
---
arrow-avro/Cargo.toml | 10 +++++++
arrow-avro/src/compression.rs | 63 +++++++++++++++++++++++++++++++++++++----
arrow-avro/src/reader/header.rs | 30 +++++++++++++++++++-
arrow-avro/src/reader/mod.rs | 28 ++++++++++++++----
testing | 2 +-
5 files changed, 119 insertions(+), 14 deletions(-)
diff --git a/arrow-avro/Cargo.toml b/arrow-avro/Cargo.toml
index 9575874c41..d2436f0c15 100644
--- a/arrow-avro/Cargo.toml
+++ b/arrow-avro/Cargo.toml
@@ -33,6 +33,11 @@ name = "arrow_avro"
path = "src/lib.rs"
bench = false
+[features]
+default = ["deflate", "snappy", "zstd"]
+deflate = ["flate2"]
+snappy = ["snap", "crc"]
+
[dependencies]
arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
@@ -41,6 +46,11 @@ arrow-data = { workspace = true }
arrow-schema = { workspace = true }
serde_json = { version = "1.0", default-features = false, features = ["std"] }
serde = { version = "1.0.188", features = ["derive"] }
+flate2 = { version = "1.0", default-features = false, features =
["rust_backend"], optional = true }
+snap = { version = "1.0", default-features = false, optional = true }
+zstd = { version = "0.13", default-features = false, optional = true }
+crc = { version = "3.0", optional = true }
+
[dev-dependencies]
diff --git a/arrow-avro/src/compression.rs b/arrow-avro/src/compression.rs
index a1a44fc22b..c5c7a6dabc 100644
--- a/arrow-avro/src/compression.rs
+++ b/arrow-avro/src/compression.rs
@@ -15,18 +15,69 @@
// specific language governing permissions and limitations
// under the License.
-use serde::{Deserialize, Serialize};
+use arrow_schema::ArrowError;
+use flate2::read;
+use std::io;
+use std::io::Read;
/// The metadata key used for storing the JSON encoded [`CompressionCodec`]
pub const CODEC_METADATA_KEY: &str = "avro.codec";
-#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
-#[serde(rename_all = "lowercase")]
+#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum CompressionCodec {
- Null,
Deflate,
- BZip2,
Snappy,
- XZ,
ZStandard,
}
+
+impl CompressionCodec {
+ pub(crate) fn decompress(&self, block: &[u8]) -> Result<Vec<u8>,
ArrowError> {
+ match self {
+ #[cfg(feature = "deflate")]
+ CompressionCodec::Deflate => {
+ let mut decoder = read::DeflateDecoder::new(block);
+ let mut out = Vec::new();
+ decoder.read_to_end(&mut out)?;
+ Ok(out)
+ }
+ #[cfg(not(feature = "deflate"))]
+ CompressionCodec::Deflate => Err(ArrowError::ParseError(
+ "Deflate codec requires deflate feature".to_string(),
+ )),
+ #[cfg(feature = "snappy")]
+ CompressionCodec::Snappy => {
+ // Each compressed block is followed by the 4-byte, big-endian
CRC32
+ // checksum of the uncompressed data in the block.
+ let crc = &block[block.len() - 4..];
+ let block = &block[..block.len() - 4];
+
+ let mut decoder = snap::raw::Decoder::new();
+ let decoded = decoder
+ .decompress_vec(block)
+ .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
+
+ let checksum =
crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC).checksum(&decoded);
+ if checksum != u32::from_be_bytes(crc.try_into().unwrap()) {
+ return Err(ArrowError::ParseError("Snappy CRC
mismatch".to_string()));
+ }
+ Ok(decoded)
+ }
+ #[cfg(not(feature = "snappy"))]
+ CompressionCodec::Snappy => Err(ArrowError::ParseError(
+ "Snappy codec requires snappy feature".to_string(),
+ )),
+
+ #[cfg(feature = "zstd")]
+ CompressionCodec::ZStandard => {
+ let mut decoder = zstd::Decoder::new(block)?;
+ let mut out = Vec::new();
+ decoder.read_to_end(&mut out)?;
+ Ok(out)
+ }
+ #[cfg(not(feature = "zstd"))]
+ CompressionCodec::ZStandard => Err(ArrowError::ParseError(
+ "ZStandard codec requires zstd feature".to_string(),
+ )),
+ }
+ }
+}
diff --git a/arrow-avro/src/reader/header.rs b/arrow-avro/src/reader/header.rs
index 97f5d3b8b1..19d48d1f89 100644
--- a/arrow-avro/src/reader/header.rs
+++ b/arrow-avro/src/reader/header.rs
@@ -17,6 +17,7 @@
//! Decoder for [`Header`]
+use crate::compression::{CompressionCodec, CODEC_METADATA_KEY};
use crate::reader::vlq::VLQDecoder;
use crate::schema::Schema;
use arrow_schema::ArrowError;
@@ -55,7 +56,7 @@ impl Header {
/// Returns an iterator over the meta keys in this header
pub fn metadata(&self) -> impl Iterator<Item = (&[u8], &[u8])> {
let mut last = 0;
- self.meta_offsets.windows(2).map(move |w| {
+ self.meta_offsets.chunks_exact(2).map(move |w| {
let start = last;
last = w[1];
(&self.meta_buf[start..w[0]], &self.meta_buf[w[0]..w[1]])
@@ -72,6 +73,22 @@ impl Header {
pub fn sync(&self) -> [u8; 16] {
self.sync
}
+
+ /// Returns the [`CompressionCodec`] if any
+ pub fn compression(&self) -> Result<Option<CompressionCodec>, ArrowError> {
+ let v = self.get(CODEC_METADATA_KEY);
+
+ match v {
+ None | Some(b"null") => Ok(None),
+ Some(b"deflate") => Ok(Some(CompressionCodec::Deflate)),
+ Some(b"snappy") => Ok(Some(CompressionCodec::Snappy)),
+ Some(b"zstandard") => Ok(Some(CompressionCodec::ZStandard)),
+ Some(v) => Err(ArrowError::ParseError(format!(
+ "Unrecognized compression codec \'{}\'",
+ String::from_utf8_lossy(v)
+ ))),
+ }
+ }
}
/// A decoder for [`Header`]
@@ -305,6 +322,17 @@ mod test {
);
let header =
decode_file(&arrow_test_data("avro/fixed_length_decimal.avro"));
+
+ let meta: Vec<_> = header
+ .metadata()
+ .map(|(k, _)| std::str::from_utf8(k).unwrap())
+ .collect();
+
+ assert_eq!(
+ meta,
+ &["avro.schema", "org.apache.spark.version", "avro.codec"]
+ );
+
let schema_json = header.get(SCHEMA_METADATA_KEY).unwrap();
let expected =
br#"{"type":"record","name":"topLevelRecord","fields":[{"name":"value","type":[{"type":"fixed","name":"fixed","namespace":"topLevelRecord.value","size":11,"logicalType":"decimal","precision":25,"scale":2},"null"]}]}"#;
assert_eq!(schema_json, expected);
diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs
index 7769bbbc49..0151db7f85 100644
--- a/arrow-avro/src/reader/mod.rs
+++ b/arrow-avro/src/reader/mod.rs
@@ -73,6 +73,7 @@ fn read_blocks<R: BufRead>(mut reader: R) -> impl
Iterator<Item = Result<Block,
#[cfg(test)]
mod test {
+ use crate::compression::CompressionCodec;
use crate::reader::{read_blocks, read_header};
use crate::test_util::arrow_test_data;
use std::fs::File;
@@ -80,12 +81,27 @@ mod test {
#[test]
fn test_mux() {
- let file =
File::open(arrow_test_data("avro/alltypes_plain.avro")).unwrap();
- let mut reader = BufReader::new(file);
- let header = read_header(&mut reader).unwrap();
- for result in read_blocks(reader) {
- let block = result.unwrap();
- assert_eq!(block.sync, header.sync());
+ let files = [
+ "avro/alltypes_plain.avro",
+ "avro/alltypes_plain.snappy.avro",
+ "avro/alltypes_plain.zstandard.avro",
+ "avro/alltypes_nulls_plain.avro",
+ ];
+
+ for file in files {
+ println!("file: {file}");
+ let file = File::open(arrow_test_data(file)).unwrap();
+ let mut reader = BufReader::new(file);
+ let header = read_header(&mut reader).unwrap();
+ let compression = header.compression().unwrap();
+ println!("compression: {compression:?}");
+ for result in read_blocks(reader) {
+ let block = result.unwrap();
+ assert_eq!(block.sync, header.sync());
+ if let Some(c) = compression {
+ c.decompress(&block.data).unwrap();
+ }
+ }
}
}
}
diff --git a/testing b/testing
index d315f79852..e270341fb5 160000
--- a/testing
+++ b/testing
@@ -1 +1 @@
-Subproject commit d315f7985207d2d67fc2c8e41053e9d97d573f4b
+Subproject commit e270341fb5f3ff785410e6286cc42898e9d6a99c