This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 202e9a1c29 Avro block decompression (#5306)
202e9a1c29 is described below

commit 202e9a1c29ef93d0bf4c42ea8c4c3733b44721ab
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Wed Jan 17 16:23:42 2024 +0000

    Avro block decompression (#5306)
    
    * Avro block decompression
    
    * Clippy
    
    * Update arrow-avro/src/reader/mod.rs
    
    Co-authored-by: Marco Neumann <[email protected]>
    
    ---------
    
    Co-authored-by: Marco Neumann <[email protected]>
---
 arrow-avro/Cargo.toml           | 10 +++++++
 arrow-avro/src/compression.rs   | 63 +++++++++++++++++++++++++++++++++++++----
 arrow-avro/src/reader/header.rs | 30 +++++++++++++++++++-
 arrow-avro/src/reader/mod.rs    | 28 ++++++++++++++----
 testing                         |  2 +-
 5 files changed, 119 insertions(+), 14 deletions(-)

diff --git a/arrow-avro/Cargo.toml b/arrow-avro/Cargo.toml
index 9575874c41..d2436f0c15 100644
--- a/arrow-avro/Cargo.toml
+++ b/arrow-avro/Cargo.toml
@@ -33,6 +33,11 @@ name = "arrow_avro"
 path = "src/lib.rs"
 bench = false
 
+[features]
+default = ["deflate", "snappy", "zstd"]
+deflate = ["flate2"]
+snappy = ["snap", "crc"]
+
 [dependencies]
 arrow-array = { workspace = true  }
 arrow-buffer = { workspace = true  }
@@ -41,6 +46,11 @@ arrow-data = { workspace = true  }
 arrow-schema = { workspace = true  }
 serde_json = { version = "1.0", default-features = false, features = ["std"] }
 serde = { version = "1.0.188", features = ["derive"] }
+flate2 = { version = "1.0", default-features = false, features = 
["rust_backend"], optional = true }
+snap = { version = "1.0", default-features = false, optional = true }
+zstd = { version = "0.13", default-features = false, optional = true }
+crc = { version = "3.0", optional = true }
+
 
 [dev-dependencies]
 
diff --git a/arrow-avro/src/compression.rs b/arrow-avro/src/compression.rs
index a1a44fc22b..c5c7a6dabc 100644
--- a/arrow-avro/src/compression.rs
+++ b/arrow-avro/src/compression.rs
@@ -15,18 +15,69 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use serde::{Deserialize, Serialize};
+use arrow_schema::ArrowError;
+use flate2::read;
+use std::io;
+use std::io::Read;
 
 /// The metadata key used for storing the JSON encoded [`CompressionCodec`]
 pub const CODEC_METADATA_KEY: &str = "avro.codec";
 
-#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
-#[serde(rename_all = "lowercase")]
+#[derive(Debug, Copy, Clone, Eq, PartialEq)]
 pub enum CompressionCodec {
-    Null,
     Deflate,
-    BZip2,
     Snappy,
-    XZ,
     ZStandard,
 }
+
+impl CompressionCodec {
+    pub(crate) fn decompress(&self, block: &[u8]) -> Result<Vec<u8>, 
ArrowError> {
+        match self {
+            #[cfg(feature = "deflate")]
+            CompressionCodec::Deflate => {
+                let mut decoder = read::DeflateDecoder::new(block);
+                let mut out = Vec::new();
+                decoder.read_to_end(&mut out)?;
+                Ok(out)
+            }
+            #[cfg(not(feature = "deflate"))]
+            CompressionCodec::Deflate => Err(ArrowError::ParseError(
+                "Deflate codec requires deflate feature".to_string(),
+            )),
+            #[cfg(feature = "snappy")]
+            CompressionCodec::Snappy => {
+                // Each compressed block is followed by the 4-byte, big-endian 
CRC32
+                // checksum of the uncompressed data in the block.
+                let crc = &block[block.len() - 4..];
+                let block = &block[..block.len() - 4];
+
+                let mut decoder = snap::raw::Decoder::new();
+                let decoded = decoder
+                    .decompress_vec(block)
+                    .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
+
+                let checksum = 
crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC).checksum(&decoded);
+                if checksum != u32::from_be_bytes(crc.try_into().unwrap()) {
+                    return Err(ArrowError::ParseError("Snappy CRC 
mismatch".to_string()));
+                }
+                Ok(decoded)
+            }
+            #[cfg(not(feature = "snappy"))]
+            CompressionCodec::Snappy => Err(ArrowError::ParseError(
+                "Snappy codec requires snappy feature".to_string(),
+            )),
+
+            #[cfg(feature = "zstd")]
+            CompressionCodec::ZStandard => {
+                let mut decoder = zstd::Decoder::new(block)?;
+                let mut out = Vec::new();
+                decoder.read_to_end(&mut out)?;
+                Ok(out)
+            }
+            #[cfg(not(feature = "zstd"))]
+            CompressionCodec::ZStandard => Err(ArrowError::ParseError(
+                "ZStandard codec requires zstd feature".to_string(),
+            )),
+        }
+    }
+}
diff --git a/arrow-avro/src/reader/header.rs b/arrow-avro/src/reader/header.rs
index 97f5d3b8b1..19d48d1f89 100644
--- a/arrow-avro/src/reader/header.rs
+++ b/arrow-avro/src/reader/header.rs
@@ -17,6 +17,7 @@
 
 //! Decoder for [`Header`]
 
+use crate::compression::{CompressionCodec, CODEC_METADATA_KEY};
 use crate::reader::vlq::VLQDecoder;
 use crate::schema::Schema;
 use arrow_schema::ArrowError;
@@ -55,7 +56,7 @@ impl Header {
     /// Returns an iterator over the meta keys in this header
     pub fn metadata(&self) -> impl Iterator<Item = (&[u8], &[u8])> {
         let mut last = 0;
-        self.meta_offsets.windows(2).map(move |w| {
+        self.meta_offsets.chunks_exact(2).map(move |w| {
             let start = last;
             last = w[1];
             (&self.meta_buf[start..w[0]], &self.meta_buf[w[0]..w[1]])
@@ -72,6 +73,22 @@ impl Header {
     pub fn sync(&self) -> [u8; 16] {
         self.sync
     }
+
+    /// Returns the [`CompressionCodec`] if any
+    pub fn compression(&self) -> Result<Option<CompressionCodec>, ArrowError> {
+        let v = self.get(CODEC_METADATA_KEY);
+
+        match v {
+            None | Some(b"null") => Ok(None),
+            Some(b"deflate") => Ok(Some(CompressionCodec::Deflate)),
+            Some(b"snappy") => Ok(Some(CompressionCodec::Snappy)),
+            Some(b"zstandard") => Ok(Some(CompressionCodec::ZStandard)),
+            Some(v) => Err(ArrowError::ParseError(format!(
+                "Unrecognized compression codec \'{}\'",
+                String::from_utf8_lossy(v)
+            ))),
+        }
+    }
 }
 
 /// A decoder for [`Header`]
@@ -305,6 +322,17 @@ mod test {
         );
 
         let header = 
decode_file(&arrow_test_data("avro/fixed_length_decimal.avro"));
+
+        let meta: Vec<_> = header
+            .metadata()
+            .map(|(k, _)| std::str::from_utf8(k).unwrap())
+            .collect();
+
+        assert_eq!(
+            meta,
+            &["avro.schema", "org.apache.spark.version", "avro.codec"]
+        );
+
         let schema_json = header.get(SCHEMA_METADATA_KEY).unwrap();
         let expected = 
br#"{"type":"record","name":"topLevelRecord","fields":[{"name":"value","type":[{"type":"fixed","name":"fixed","namespace":"topLevelRecord.value","size":11,"logicalType":"decimal","precision":25,"scale":2},"null"]}]}"#;
         assert_eq!(schema_json, expected);
diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs
index 7769bbbc49..0151db7f85 100644
--- a/arrow-avro/src/reader/mod.rs
+++ b/arrow-avro/src/reader/mod.rs
@@ -73,6 +73,7 @@ fn read_blocks<R: BufRead>(mut reader: R) -> impl 
Iterator<Item = Result<Block,
 
 #[cfg(test)]
 mod test {
+    use crate::compression::CompressionCodec;
     use crate::reader::{read_blocks, read_header};
     use crate::test_util::arrow_test_data;
     use std::fs::File;
@@ -80,12 +81,27 @@ mod test {
 
     #[test]
     fn test_mux() {
-        let file = 
File::open(arrow_test_data("avro/alltypes_plain.avro")).unwrap();
-        let mut reader = BufReader::new(file);
-        let header = read_header(&mut reader).unwrap();
-        for result in read_blocks(reader) {
-            let block = result.unwrap();
-            assert_eq!(block.sync, header.sync());
+        let files = [
+            "avro/alltypes_plain.avro",
+            "avro/alltypes_plain.snappy.avro",
+            "avro/alltypes_plain.zstandard.avro",
+            "avro/alltypes_nulls_plain.avro",
+        ];
+
+        for file in files {
+            println!("file: {file}");
+            let file = File::open(arrow_test_data(file)).unwrap();
+            let mut reader = BufReader::new(file);
+            let header = read_header(&mut reader).unwrap();
+            let compression = header.compression().unwrap();
+            println!("compression: {compression:?}");
+            for result in read_blocks(reader) {
+                let block = result.unwrap();
+                assert_eq!(block.sync, header.sync());
+                if let Some(c) = compression {
+                    c.decompress(&block.data).unwrap();
+                }
+            }
         }
     }
 }
diff --git a/testing b/testing
index d315f79852..e270341fb5 160000
--- a/testing
+++ b/testing
@@ -1 +1 @@
-Subproject commit d315f7985207d2d67fc2c8e41053e9d97d573f4b
+Subproject commit e270341fb5f3ff785410e6286cc42898e9d6a99c

Reply via email to