This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e1247e8c Added support for LZ4_RAW compression. (#1604) (#2943)
4e1247e8c is described below

commit 4e1247e8c03f36940a912256e2d94f49a1b581df
Author: Adrián Gallego Castellanos <[email protected]>
AuthorDate: Thu Oct 27 22:59:25 2022 +0200

    Added support for LZ4_RAW compression. (#1604) (#2943)
    
    * Added support for LZ4_RAW compression. (#1604)
    
    * This adds the implementation of LZ4_RAW codec by using lz4 block 
compression algorithm. (#1604)
    * This commit uses 
https://stackoverflow.com/questions/25740471/lz4-library-decompressed-data-upper-bound-size-estimation
 formula to estime the size of the uncompressed size. As it said in thread this 
algorithm over-estimates the size, but it is probably the best we can get with 
the current decompress API. As the size of a arrow LZ4_RAW block is not 
prepended to the block.
    * Other option would be to take the C++ approach to bypass the API 
(https://github.com/apache/arrow/blob/master/cpp/src/arrow/util/compression_lz4.cc#L343).
 This approach consists on relaying on the output_buffer capacity to guess the 
uncompress_size. This works as `serialized_reader.rs` already knows the 
uncompressed_size, as it reads it from the page header, and allocates the 
output_buffer with a capacity equal to the uncompress_size 
(https://github.com/marioloko/arrow-rs/blob/maste [...]
        1. It is too hacky.
        2. It will limit the use cases of the `decompress` API, as the caller 
will need to know to allocate the right uncompressed_size.
        3. It is not compatible with the current set of tests. However, new 
test can be created.
    
    * Clippy
    
    * Add integration test
    
    Co-authored-by: Adrián Gallego Castellanos <[email protected]>
    Co-authored-by: Raphael Taylor-Davies <[email protected]>
---
 parquet/src/arrow/arrow_reader/mod.rs | 31 +++++++++++++++++
 parquet/src/basic.rs                  |  4 +++
 parquet/src/compression.rs            | 64 +++++++++++++++++++++++++++++++++++
 3 files changed, 99 insertions(+)

diff --git a/parquet/src/arrow/arrow_reader/mod.rs 
b/parquet/src/arrow/arrow_reader/mod.rs
index 51b09302c..7f68b07eb 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -2390,4 +2390,35 @@ mod tests {
             assert_eq!(full.column(idx), projected.column(0));
         }
     }
+
+    #[test]
+    fn test_read_lz4_raw() {
+        let testdata = arrow::util::test_util::parquet_test_data();
+        let path = format!("{}/lz4_raw_compressed.parquet", testdata);
+        let file = File::open(&path).unwrap();
+
+        let batches = ParquetRecordBatchReader::try_new(file, 1024)
+            .unwrap()
+            .collect::<ArrowResult<Vec<_>>>()
+            .unwrap();
+        assert_eq!(batches.len(), 1);
+        let batch = &batches[0];
+
+        assert_eq!(batch.num_columns(), 3);
+        assert_eq!(batch.num_rows(), 4);
+
+        // https://github.com/apache/parquet-testing/pull/18
+        let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
+        assert_eq!(
+            a.values(),
+            &[1593604800, 1593604800, 1593604801, 1593604801]
+        );
+
+        let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
+        let a: Vec<_> = a.iter().flatten().collect();
+        assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
+
+        let a: &Float64Array = 
batch.column(2).as_any().downcast_ref().unwrap();
+        assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
+    }
 }
diff --git a/parquet/src/basic.rs b/parquet/src/basic.rs
index b0f591c7a..96cdd537d 100644
--- a/parquet/src/basic.rs
+++ b/parquet/src/basic.rs
@@ -282,6 +282,7 @@ pub enum Encoding {
 
 /// Supported compression algorithms.
 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
+#[allow(non_camel_case_types)]
 pub enum Compression {
     UNCOMPRESSED,
     SNAPPY,
@@ -290,6 +291,7 @@ pub enum Compression {
     BROTLI,
     LZ4,
     ZSTD,
+    LZ4_RAW,
 }
 
 // ----------------------------------------------------------------------
@@ -826,6 +828,7 @@ impl TryFrom<parquet::CompressionCodec> for Compression {
             parquet::CompressionCodec::BROTLI => Compression::BROTLI,
             parquet::CompressionCodec::LZ4 => Compression::LZ4,
             parquet::CompressionCodec::ZSTD => Compression::ZSTD,
+            parquet::CompressionCodec::LZ4_RAW => Compression::LZ4_RAW,
             _ => {
                 return Err(general_err!(
                     "unexpected parquet compression codec: {}",
@@ -846,6 +849,7 @@ impl From<Compression> for parquet::CompressionCodec {
             Compression::BROTLI => parquet::CompressionCodec::BROTLI,
             Compression::LZ4 => parquet::CompressionCodec::LZ4,
             Compression::ZSTD => parquet::CompressionCodec::ZSTD,
+            Compression::LZ4_RAW => parquet::CompressionCodec::LZ4_RAW,
         }
     }
 }
diff --git a/parquet/src/compression.rs b/parquet/src/compression.rs
index ee5141cbe..f110e3d82 100644
--- a/parquet/src/compression.rs
+++ b/parquet/src/compression.rs
@@ -77,6 +77,8 @@ pub fn create_codec(codec: CodecType) -> 
Result<Option<Box<dyn Codec>>> {
         CodecType::LZ4 => Ok(Some(Box::new(LZ4Codec::new()))),
         #[cfg(any(feature = "zstd", test))]
         CodecType::ZSTD => Ok(Some(Box::new(ZSTDCodec::new()))),
+        #[cfg(any(feature = "lz4", test))]
+        CodecType::LZ4_RAW => Ok(Some(Box::new(LZ4RawCodec::new()))),
         CodecType::UNCOMPRESSED => Ok(None),
         _ => Err(nyi_err!("The codec type {} is not supported yet", codec)),
     }
@@ -325,6 +327,63 @@ mod zstd_codec {
 #[cfg(any(feature = "zstd", test))]
 pub use zstd_codec::*;
 
+#[cfg(any(feature = "lz4", test))]
+mod lz4_raw_codec {
+    use crate::compression::Codec;
+    use crate::errors::Result;
+
+    /// Codec for LZ4 Raw compression algorithm.
+    pub struct LZ4RawCodec {}
+
+    impl LZ4RawCodec {
+        /// Creates new LZ4 Raw compression codec.
+        pub(crate) fn new() -> Self {
+            Self {}
+        }
+    }
+
+    // Compute max LZ4 uncompress size.
+    // Check 
https://stackoverflow.com/questions/25740471/lz4-library-decompressed-data-upper-bound-size-estimation
+    fn max_uncompressed_size(compressed_size: usize) -> usize {
+        (compressed_size << 8) - compressed_size - 2526
+    }
+
+    impl Codec for LZ4RawCodec {
+        fn decompress(
+            &mut self,
+            input_buf: &[u8],
+            output_buf: &mut Vec<u8>,
+        ) -> Result<usize> {
+            let offset = output_buf.len();
+            let required_len = max_uncompressed_size(input_buf.len());
+            output_buf.resize(offset + required_len, 0);
+            let required_len: i32 = required_len.try_into().unwrap();
+            match lz4::block::decompress_to_buffer(input_buf, 
Some(required_len), &mut output_buf[offset..]) {
+                Ok(n) => {
+                    output_buf.truncate(offset + n);
+                    Ok(n)   
+                },
+                Err(e) => Err(e.into()),
+            }
+        }
+
+        fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> 
Result<()> {
+            let offset = output_buf.len();
+            let required_len = lz4::block::compress_bound(input_buf.len())?;
+            output_buf.resize(offset + required_len, 0);
+            match lz4::block::compress_to_buffer(input_buf, None, false, &mut 
output_buf[offset..]) {
+                Ok(n) => {
+                    output_buf.truncate(offset + n);
+                    Ok(())
+                },
+                Err(e) => Err(e.into()),
+            }
+        }
+    }
+}
+#[cfg(any(feature = "lz4", test))]
+pub use lz4_raw_codec::*;
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -416,4 +475,9 @@ mod tests {
     fn test_codec_zstd() {
         test_codec(CodecType::ZSTD);
     }
+
+    #[test]
+    fn test_codec_lz4_raw() {
+        test_codec(CodecType::LZ4_RAW);
+    }
 }

Reply via email to