This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new 2840e3f ARROW-9290: [Rust] [Parquet] Add features to allow opting out of dependencies 2840e3f is described below commit 2840e3f01be65676a511d7eb25b48e46e358ecef Author: Ben Kimock <kimo...@gmail.com> AuthorDate: Thu Jul 2 15:55:59 2020 -0700 ARROW-9290: [Rust] [Parquet] Add features to allow opting out of dependencies Closes #7610 from saethlin/rust-dep-slimming Authored-by: Ben Kimock <kimo...@gmail.com> Signed-off-by: Chao Sun <sunc...@apache.org> --- rust/parquet/Cargo.toml | 24 ++- rust/parquet/src/compression.rs | 353 +++++++++++++++++++++++----------------- rust/parquet/src/errors.rs | 120 +++++++++----- rust/parquet/src/lib.rs | 1 + 4 files changed, 300 insertions(+), 198 deletions(-) diff --git a/rust/parquet/Cargo.toml b/rust/parquet/Cargo.toml index c3b6d0b..68a92aa 100644 --- a/rust/parquet/Cargo.toml +++ b/rust/parquet/Cargo.toml @@ -29,20 +29,28 @@ build = "build.rs" edition = "2018" [dependencies] -parquet-format = "~2.6" -quick-error = "1.2" +parquet-format = "2.6.1" byteorder = "1" thrift = "0.13" -snap = "1.0" -brotli = "3.3" -flate2 = "1.0" -lz4 = "1.23" -zstd = "0.5" +snap = { version = "1.0", optional = true } +brotli = { version = "3.3", optional = true } +flate2 = { version = "1.0", optional = true } +lz4 = { version = "1.23", optional = true } +zstd = { version = "0.5", optional = true } chrono = "0.4" num-bigint = "0.2" -arrow = { path = "../arrow", version = "1.0.0-SNAPSHOT" } +arrow = { path = "../arrow", version = "1.0.0-SNAPSHOT", optional = true } serde_json = { version = "1.0", features = ["preserve_order"] } [dev-dependencies] lazy_static = "1" rand = "0.6" +snap = "1.0" +brotli = "3.3" +flate2 = "1.0" +lz4 = "1.23" +zstd = "0.5" +arrow = { path = "../arrow", version = "1.0.0-SNAPSHOT" } + +[features] +default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd"] diff --git a/rust/parquet/src/compression.rs b/rust/parquet/src/compression.rs index 871bac1..9b4ac71 100644 --- a/rust/parquet/src/compression.rs +++ b/rust/parquet/src/compression.rs @@ -22,7 +22,7 @@ //! //! # Example //! -//! ```rust +//! ```no_run //! use parquet::{basic::Compression, compression::create_codec}; //! //! let mut codec = match create_codec(Compression::SNAPPY) { @@ -40,14 +40,6 @@ //! assert_eq!(output, data); //! ``` -use std::io::{self, Read, Write}; - -use brotli; -use flate2::{read, write, Compression}; -use lz4; -use snap::raw::{decompress_len, max_compress_len, Decoder, Encoder}; -use zstd; - use crate::basic::Compression as CodecType; use crate::errors::{ParquetError, Result}; @@ -70,202 +62,261 @@ pub trait Codec { /// This returns `None` if the codec type is `UNCOMPRESSED`. pub fn create_codec(codec: CodecType) -> Result<Option<Box<Codec>>> { match codec { + #[cfg(any(feature = "brotli", test))] CodecType::BROTLI => Ok(Some(Box::new(BrotliCodec::new()))), + #[cfg(any(feature = "flate2", test))] CodecType::GZIP => Ok(Some(Box::new(GZipCodec::new()))), + #[cfg(any(feature = "snap", test))] CodecType::SNAPPY => Ok(Some(Box::new(SnappyCodec::new()))), + #[cfg(any(feature = "lz4", test))] CodecType::LZ4 => Ok(Some(Box::new(LZ4Codec::new()))), + #[cfg(any(feature = "zstd", test))] CodecType::ZSTD => Ok(Some(Box::new(ZSTDCodec::new()))), CodecType::UNCOMPRESSED => Ok(None), _ => Err(nyi_err!("The codec type {} is not supported yet", codec)), } } -/// Codec for Snappy compression format. -pub struct SnappyCodec { - decoder: Decoder, - encoder: Encoder, -} +#[cfg(any(feature = "snap", test))] +mod snappy_codec { + use snap::raw::{decompress_len, max_compress_len, Decoder, Encoder}; -impl SnappyCodec { - /// Creates new Snappy compression codec. - fn new() -> Self { - Self { - decoder: Decoder::new(), - encoder: Encoder::new(), - } - } -} + use crate::compression::Codec; + use crate::errors::Result; -impl Codec for SnappyCodec { - fn decompress( - &mut self, - input_buf: &[u8], - output_buf: &mut Vec<u8>, - ) -> Result<usize> { - let len = decompress_len(input_buf)?; - output_buf.resize(len, 0); - self.decoder - .decompress(input_buf, output_buf) - .map_err(|e| e.into()) + /// Codec for Snappy compression format. + pub struct SnappyCodec { + decoder: Decoder, + encoder: Encoder, } - fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> { - let required_len = max_compress_len(input_buf.len()); - if output_buf.len() < required_len { - output_buf.resize(required_len, 0); + impl SnappyCodec { + /// Creates new Snappy compression codec. + pub(crate) fn new() -> Self { + Self { + decoder: Decoder::new(), + encoder: Encoder::new(), + } } - let n = self.encoder.compress(input_buf, &mut output_buf[..])?; - output_buf.truncate(n); - Ok(()) } -} -/// Codec for GZIP compression algorithm. -pub struct GZipCodec {} + impl Codec for SnappyCodec { + fn decompress( + &mut self, + input_buf: &[u8], + output_buf: &mut Vec<u8>, + ) -> Result<usize> { + let len = decompress_len(input_buf)?; + output_buf.resize(len, 0); + self.decoder + .decompress(input_buf, output_buf) + .map_err(|e| e.into()) + } -impl GZipCodec { - /// Creates new GZIP compression codec. - fn new() -> Self { - Self {} + fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> { + let required_len = max_compress_len(input_buf.len()); + if output_buf.len() < required_len { + output_buf.resize(required_len, 0); + } + let n = self.encoder.compress(input_buf, &mut output_buf[..])?; + output_buf.truncate(n); + Ok(()) + } } } +#[cfg(any(feature = "snap", test))] +pub use snappy_codec::*; + +#[cfg(any(feature = "flate2", test))] +mod gzip_codec { + + use std::io::{Read, Write}; + + use flate2::{read, write, Compression}; + + use crate::compression::Codec; + use crate::errors::Result; -impl Codec for GZipCodec { - fn decompress( - &mut self, - input_buf: &[u8], - output_buf: &mut Vec<u8>, - ) -> Result<usize> { - let mut decoder = read::GzDecoder::new(input_buf); - decoder.read_to_end(output_buf).map_err(|e| e.into()) + /// Codec for GZIP compression algorithm. + pub struct GZipCodec {} + + impl GZipCodec { + /// Creates new GZIP compression codec. + pub(crate) fn new() -> Self { + Self {} + } } - fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> { - let mut encoder = write::GzEncoder::new(output_buf, Compression::default()); - encoder.write_all(input_buf)?; - encoder.try_finish().map_err(|e| e.into()) + impl Codec for GZipCodec { + fn decompress( + &mut self, + input_buf: &[u8], + output_buf: &mut Vec<u8>, + ) -> Result<usize> { + let mut decoder = read::GzDecoder::new(input_buf); + decoder.read_to_end(output_buf).map_err(|e| e.into()) + } + + fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> { + let mut encoder = write::GzEncoder::new(output_buf, Compression::default()); + encoder.write_all(input_buf)?; + encoder.try_finish().map_err(|e| e.into()) + } } } +#[cfg(any(feature = "flate2", test))] +pub use gzip_codec::*; -const BROTLI_DEFAULT_BUFFER_SIZE: usize = 4096; -const BROTLI_DEFAULT_COMPRESSION_QUALITY: u32 = 1; // supported levels 0-9 -const BROTLI_DEFAULT_LG_WINDOW_SIZE: u32 = 22; // recommended between 20-22 +#[cfg(any(feature = "brotli", test))] +mod brotli_codec { -/// Codec for Brotli compression algorithm. -pub struct BrotliCodec {} + use std::io::{Read, Write}; -impl BrotliCodec { - /// Creates new Brotli compression codec. - fn new() -> Self { - Self {} - } -} + use crate::compression::Codec; + use crate::errors::Result; + + const BROTLI_DEFAULT_BUFFER_SIZE: usize = 4096; + const BROTLI_DEFAULT_COMPRESSION_QUALITY: u32 = 1; // supported levels 0-9 + const BROTLI_DEFAULT_LG_WINDOW_SIZE: u32 = 22; // recommended between 20-22 -impl Codec for BrotliCodec { - fn decompress( - &mut self, - input_buf: &[u8], - output_buf: &mut Vec<u8>, - ) -> Result<usize> { - brotli::Decompressor::new(input_buf, BROTLI_DEFAULT_BUFFER_SIZE) - .read_to_end(output_buf) - .map_err(|e| e.into()) + /// Codec for Brotli compression algorithm. + pub struct BrotliCodec {} + + impl BrotliCodec { + /// Creates new Brotli compression codec. + pub(crate) fn new() -> Self { + Self {} + } } - fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> { - let mut encoder = brotli::CompressorWriter::new( - output_buf, - BROTLI_DEFAULT_BUFFER_SIZE, - BROTLI_DEFAULT_COMPRESSION_QUALITY, - BROTLI_DEFAULT_LG_WINDOW_SIZE, - ); - encoder.write_all(&input_buf[..])?; - encoder.flush().map_err(|e| e.into()) + impl Codec for BrotliCodec { + fn decompress( + &mut self, + input_buf: &[u8], + output_buf: &mut Vec<u8>, + ) -> Result<usize> { + brotli::Decompressor::new(input_buf, BROTLI_DEFAULT_BUFFER_SIZE) + .read_to_end(output_buf) + .map_err(|e| e.into()) + } + + fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> { + let mut encoder = brotli::CompressorWriter::new( + output_buf, + BROTLI_DEFAULT_BUFFER_SIZE, + BROTLI_DEFAULT_COMPRESSION_QUALITY, + BROTLI_DEFAULT_LG_WINDOW_SIZE, + ); + encoder.write_all(&input_buf[..])?; + encoder.flush().map_err(|e| e.into()) + } } } +#[cfg(any(feature = "brotli", test))] +pub use brotli_codec::*; + +#[cfg(any(feature = "lz4", test))] +mod lz4_codec { + use std::io::{Read, Write}; + + use crate::compression::Codec; + use crate::errors::Result; -const LZ4_BUFFER_SIZE: usize = 4096; + const LZ4_BUFFER_SIZE: usize = 4096; -/// Codec for LZ4 compression algorithm. -pub struct LZ4Codec {} + /// Codec for LZ4 compression algorithm. + pub struct LZ4Codec {} -impl LZ4Codec { - /// Creates new LZ4 compression codec. - fn new() -> Self { - Self {} + impl LZ4Codec { + /// Creates new LZ4 compression codec. + pub(crate) fn new() -> Self { + Self {} + } } -} -impl Codec for LZ4Codec { - fn decompress( - &mut self, - input_buf: &[u8], - output_buf: &mut Vec<u8>, - ) -> Result<usize> { - let mut decoder = lz4::Decoder::new(input_buf)?; - let mut buffer: [u8; LZ4_BUFFER_SIZE] = [0; LZ4_BUFFER_SIZE]; - let mut total_len = 0; - loop { - let len = decoder.read(&mut buffer)?; - if len == 0 { - break; + impl Codec for LZ4Codec { + fn decompress( + &mut self, + input_buf: &[u8], + output_buf: &mut Vec<u8>, + ) -> Result<usize> { + let mut decoder = lz4::Decoder::new(input_buf)?; + let mut buffer: [u8; LZ4_BUFFER_SIZE] = [0; LZ4_BUFFER_SIZE]; + let mut total_len = 0; + loop { + let len = decoder.read(&mut buffer)?; + if len == 0 { + break; + } + total_len += len; + output_buf.write_all(&buffer[0..len])?; } - total_len += len; - output_buf.write_all(&buffer[0..len])?; + Ok(total_len) } - Ok(total_len) - } - fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> { - let mut encoder = lz4::EncoderBuilder::new().build(output_buf)?; - let mut from = 0; - loop { - let to = std::cmp::min(from + LZ4_BUFFER_SIZE, input_buf.len()); - encoder.write_all(&input_buf[from..to])?; - from += LZ4_BUFFER_SIZE; - if from >= input_buf.len() { - break; + fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> { + let mut encoder = lz4::EncoderBuilder::new().build(output_buf)?; + let mut from = 0; + loop { + let to = std::cmp::min(from + LZ4_BUFFER_SIZE, input_buf.len()); + encoder.write_all(&input_buf[from..to])?; + from += LZ4_BUFFER_SIZE; + if from >= input_buf.len() { + break; + } } + encoder.finish().1.map_err(|e| e.into()) } - encoder.finish().1.map_err(|e| e.into()) } } +#[cfg(any(feature = "lz4", test))] +pub use lz4_codec::*; -/// Codec for Zstandard compression algorithm. -pub struct ZSTDCodec {} +#[cfg(any(feature = "zstd", test))] +mod zstd_codec { + use std::io::{self, Write}; -impl ZSTDCodec { - /// Creates new Zstandard compression codec. - fn new() -> Self { - Self {} - } -} + use crate::compression::Codec; + use crate::errors::Result; -/// Compression level (1-21) for ZSTD. Choose 1 here for better compression speed. -const ZSTD_COMPRESSION_LEVEL: i32 = 1; - -impl Codec for ZSTDCodec { - fn decompress( - &mut self, - input_buf: &[u8], - output_buf: &mut Vec<u8>, - ) -> Result<usize> { - let mut decoder = zstd::Decoder::new(input_buf)?; - match io::copy(&mut decoder, output_buf) { - Ok(n) => Ok(n as usize), - Err(e) => Err(e.into()), + /// Codec for Zstandard compression algorithm. + pub struct ZSTDCodec {} + + impl ZSTDCodec { + /// Creates new Zstandard compression codec. + pub(crate) fn new() -> Self { + Self {} } } - fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> { - let mut encoder = zstd::Encoder::new(output_buf, ZSTD_COMPRESSION_LEVEL)?; - encoder.write_all(&input_buf[..])?; - match encoder.finish() { - Ok(_) => Ok(()), - Err(e) => Err(e.into()), + /// Compression level (1-21) for ZSTD. Choose 1 here for better compression speed. + const ZSTD_COMPRESSION_LEVEL: i32 = 1; + + impl Codec for ZSTDCodec { + fn decompress( + &mut self, + input_buf: &[u8], + output_buf: &mut Vec<u8>, + ) -> Result<usize> { + let mut decoder = zstd::Decoder::new(input_buf)?; + match io::copy(&mut decoder, output_buf) { + Ok(n) => Ok(n as usize), + Err(e) => Err(e.into()), + } + } + + fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> { + let mut encoder = zstd::Encoder::new(output_buf, ZSTD_COMPRESSION_LEVEL)?; + encoder.write_all(&input_buf[..])?; + match encoder.finish() { + Ok(_) => Ok(()), + Err(e) => Err(e.into()), + } } } } +#[cfg(any(feature = "zstd", test))] +pub use zstd_codec::*; #[cfg(test)] mod tests { diff --git a/rust/parquet/src/errors.rs b/rust/parquet/src/errors.rs index e532834..18673a4 100644 --- a/rust/parquet/src/errors.rs +++ b/rust/parquet/src/errors.rs @@ -19,46 +19,87 @@ use std::{cell, convert, io, result, str}; +#[cfg(any(feature = "arrow", test))] use arrow::error::ArrowError; -use quick_error::quick_error; -use snap; -use thrift; - -quick_error! { - /// Set of errors that can be produced during different operations in Parquet. - #[derive(Debug, PartialEq)] - pub enum ParquetError { - /// General Parquet error. - /// Returned when code violates normal workflow of working with Parquet files. - General(message: String) { - display("Parquet error: {}", message) - from(e: io::Error) -> (format!("underlying IO error: {}", e)) - from(e: snap::Error) -> (format!("underlying snap error: {}", e)) - from(e: thrift::Error) -> (format!("underlying Thrift error: {}", e)) - from(e: cell::BorrowMutError) -> (format!("underlying borrow error: {}", e)) - from(e: str::Utf8Error) -> (format!("underlying utf8 error: {}", e)) - } - /// "Not yet implemented" Parquet error. - /// Returned when functionality is not yet available. - NYI(message: String) { - display("NYI: {}", message) - } - /// "End of file" Parquet error. - /// Returned when IO related failures occur, e.g. when there are not enough bytes to - /// decode. - EOF(message: String) { - display("EOF: {}", message) - } - /// Arrow error. - /// Returned when reading into arrow or writing from arrow. - ArrowError(message: String) { - display("Arrow: {}", message) - from(e: ArrowError) -> (format!("underlying Arrow error: {:?}", e)) - } - IndexOutOfBound(index: usize, bound: usize) { - display("Index {} out of bound: {}", index, bound) - } - } + +#[derive(Debug, PartialEq)] +pub enum ParquetError { + /// General Parquet error. + /// Returned when code violates normal workflow of working with Parquet files. + General(String), + /// "Not yet implemented" Parquet error. + /// Returned when functionality is not yet available. + NYI(String), + /// "End of file" Parquet error. + /// Returned when IO related failures occur, e.g. when there are not enough bytes to + /// decode. + EOF(String), + #[cfg(any(feature = "arrow", test))] + /// Arrow error. + /// Returned when reading into arrow or writing from arrow. + ArrowError(String), + IndexOutOfBound(usize, usize), +} + +impl std::fmt::Display for ParquetError { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + match *self { + ParquetError::General(ref message) => { + write!(fmt, "Parquet error: {}", message) + } + ParquetError::NYI(ref message) => write!(fmt, "NYI: {}", message), + ParquetError::EOF(ref message) => write!(fmt, "EOF: {}", message), + #[cfg(any(feature = "arrow", test))] + ParquetError::ArrowError(ref message) => write!(fmt, "Arrow: {}", message), + ParquetError::IndexOutOfBound(ref index, ref bound) => { + write!(fmt, "Index {} out of bound: {}", index, bound) + } + } + } +} + +impl std::error::Error for ParquetError { + fn cause(&self) -> Option<&::std::error::Error> { + None + } +} + +impl From<io::Error> for ParquetError { + fn from(e: io::Error) -> ParquetError { + ParquetError::General(format!("underlying IO error: {}", e)) + } +} + +#[cfg(any(feature = "snap", test))] +impl From<snap::Error> for ParquetError { + fn from(e: snap::Error) -> ParquetError { + ParquetError::General(format!("underlying snap error: {}", e)) + } +} + +impl From<thrift::Error> for ParquetError { + fn from(e: thrift::Error) -> ParquetError { + ParquetError::General(format!("underlying Thrift error: {}", e)) + } +} + +impl From<cell::BorrowMutError> for ParquetError { + fn from(e: cell::BorrowMutError) -> ParquetError { + ParquetError::General(format!("underlying borrow error: {}", e)) + } +} + +impl From<str::Utf8Error> for ParquetError { + fn from(e: str::Utf8Error) -> ParquetError { + ParquetError::General(format!("underlying utf8 error: {}", e)) + } +} + +#[cfg(any(feature = "arrow", test))] +impl From<ArrowError> for ParquetError { + fn from(e: ArrowError) -> ParquetError { + ParquetError::ArrowError(format!("underlying Arrow error: {}", e)) + } } /// A specialized `Result` for Parquet errors. @@ -97,6 +138,7 @@ macro_rules! eof_err { // ---------------------------------------------------------------------- // Convert parquet error into other errors +#[cfg(any(feature = "arrow", test))] impl Into<ArrowError> for ParquetError { fn into(self) -> ArrowError { ArrowError::ParquetError(format!("{}", self)) diff --git a/rust/parquet/src/lib.rs b/rust/parquet/src/lib.rs index 8d23e89..6d5a6b3 100644 --- a/rust/parquet/src/lib.rs +++ b/rust/parquet/src/lib.rs @@ -31,6 +31,7 @@ pub use self::util::memory; #[macro_use] mod util; +#[cfg(any(feature = "arrow", test))] pub mod arrow; pub mod column; pub mod compression;