This is an automated email from the ASF dual-hosted git repository.
sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 2840e3f ARROW-9290: [Rust] [Parquet] Add features to allow opting out
of dependencies
2840e3f is described below
commit 2840e3f01be65676a511d7eb25b48e46e358ecef
Author: Ben Kimock <[email protected]>
AuthorDate: Thu Jul 2 15:55:59 2020 -0700
ARROW-9290: [Rust] [Parquet] Add features to allow opting out of
dependencies
Closes #7610 from saethlin/rust-dep-slimming
Authored-by: Ben Kimock <[email protected]>
Signed-off-by: Chao Sun <[email protected]>
---
rust/parquet/Cargo.toml | 24 ++-
rust/parquet/src/compression.rs | 353 +++++++++++++++++++++++-----------------
rust/parquet/src/errors.rs | 120 +++++++++-----
rust/parquet/src/lib.rs | 1 +
4 files changed, 300 insertions(+), 198 deletions(-)
diff --git a/rust/parquet/Cargo.toml b/rust/parquet/Cargo.toml
index c3b6d0b..68a92aa 100644
--- a/rust/parquet/Cargo.toml
+++ b/rust/parquet/Cargo.toml
@@ -29,20 +29,28 @@ build = "build.rs"
edition = "2018"
[dependencies]
-parquet-format = "~2.6"
-quick-error = "1.2"
+parquet-format = "2.6.1"
byteorder = "1"
thrift = "0.13"
-snap = "1.0"
-brotli = "3.3"
-flate2 = "1.0"
-lz4 = "1.23"
-zstd = "0.5"
+snap = { version = "1.0", optional = true }
+brotli = { version = "3.3", optional = true }
+flate2 = { version = "1.0", optional = true }
+lz4 = { version = "1.23", optional = true }
+zstd = { version = "0.5", optional = true }
chrono = "0.4"
num-bigint = "0.2"
-arrow = { path = "../arrow", version = "1.0.0-SNAPSHOT" }
+arrow = { path = "../arrow", version = "1.0.0-SNAPSHOT", optional = true }
serde_json = { version = "1.0", features = ["preserve_order"] }
[dev-dependencies]
lazy_static = "1"
rand = "0.6"
+snap = "1.0"
+brotli = "3.3"
+flate2 = "1.0"
+lz4 = "1.23"
+zstd = "0.5"
+arrow = { path = "../arrow", version = "1.0.0-SNAPSHOT" }
+
+[features]
+default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd"]
diff --git a/rust/parquet/src/compression.rs b/rust/parquet/src/compression.rs
index 871bac1..9b4ac71 100644
--- a/rust/parquet/src/compression.rs
+++ b/rust/parquet/src/compression.rs
@@ -22,7 +22,7 @@
//!
//! # Example
//!
-//! ```rust
+//! ```no_run
//! use parquet::{basic::Compression, compression::create_codec};
//!
//! let mut codec = match create_codec(Compression::SNAPPY) {
@@ -40,14 +40,6 @@
//! assert_eq!(output, data);
//! ```
-use std::io::{self, Read, Write};
-
-use brotli;
-use flate2::{read, write, Compression};
-use lz4;
-use snap::raw::{decompress_len, max_compress_len, Decoder, Encoder};
-use zstd;
-
use crate::basic::Compression as CodecType;
use crate::errors::{ParquetError, Result};
@@ -70,202 +62,261 @@ pub trait Codec {
/// This returns `None` if the codec type is `UNCOMPRESSED`.
pub fn create_codec(codec: CodecType) -> Result<Option<Box<Codec>>> {
match codec {
+ #[cfg(any(feature = "brotli", test))]
CodecType::BROTLI => Ok(Some(Box::new(BrotliCodec::new()))),
+ #[cfg(any(feature = "flate2", test))]
CodecType::GZIP => Ok(Some(Box::new(GZipCodec::new()))),
+ #[cfg(any(feature = "snap", test))]
CodecType::SNAPPY => Ok(Some(Box::new(SnappyCodec::new()))),
+ #[cfg(any(feature = "lz4", test))]
CodecType::LZ4 => Ok(Some(Box::new(LZ4Codec::new()))),
+ #[cfg(any(feature = "zstd", test))]
CodecType::ZSTD => Ok(Some(Box::new(ZSTDCodec::new()))),
CodecType::UNCOMPRESSED => Ok(None),
_ => Err(nyi_err!("The codec type {} is not supported yet", codec)),
}
}
-/// Codec for Snappy compression format.
-pub struct SnappyCodec {
- decoder: Decoder,
- encoder: Encoder,
-}
+#[cfg(any(feature = "snap", test))]
+mod snappy_codec {
+ use snap::raw::{decompress_len, max_compress_len, Decoder, Encoder};
-impl SnappyCodec {
- /// Creates new Snappy compression codec.
- fn new() -> Self {
- Self {
- decoder: Decoder::new(),
- encoder: Encoder::new(),
- }
- }
-}
+ use crate::compression::Codec;
+ use crate::errors::Result;
-impl Codec for SnappyCodec {
- fn decompress(
- &mut self,
- input_buf: &[u8],
- output_buf: &mut Vec<u8>,
- ) -> Result<usize> {
- let len = decompress_len(input_buf)?;
- output_buf.resize(len, 0);
- self.decoder
- .decompress(input_buf, output_buf)
- .map_err(|e| e.into())
+ /// Codec for Snappy compression format.
+ pub struct SnappyCodec {
+ decoder: Decoder,
+ encoder: Encoder,
}
- fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) ->
Result<()> {
- let required_len = max_compress_len(input_buf.len());
- if output_buf.len() < required_len {
- output_buf.resize(required_len, 0);
+ impl SnappyCodec {
+ /// Creates new Snappy compression codec.
+ pub(crate) fn new() -> Self {
+ Self {
+ decoder: Decoder::new(),
+ encoder: Encoder::new(),
+ }
}
- let n = self.encoder.compress(input_buf, &mut output_buf[..])?;
- output_buf.truncate(n);
- Ok(())
}
-}
-/// Codec for GZIP compression algorithm.
-pub struct GZipCodec {}
+ impl Codec for SnappyCodec {
+ fn decompress(
+ &mut self,
+ input_buf: &[u8],
+ output_buf: &mut Vec<u8>,
+ ) -> Result<usize> {
+ let len = decompress_len(input_buf)?;
+ output_buf.resize(len, 0);
+ self.decoder
+ .decompress(input_buf, output_buf)
+ .map_err(|e| e.into())
+ }
-impl GZipCodec {
- /// Creates new GZIP compression codec.
- fn new() -> Self {
- Self {}
+ fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) ->
Result<()> {
+ let required_len = max_compress_len(input_buf.len());
+ if output_buf.len() < required_len {
+ output_buf.resize(required_len, 0);
+ }
+ let n = self.encoder.compress(input_buf, &mut output_buf[..])?;
+ output_buf.truncate(n);
+ Ok(())
+ }
}
}
+#[cfg(any(feature = "snap", test))]
+pub use snappy_codec::*;
+
+#[cfg(any(feature = "flate2", test))]
+mod gzip_codec {
+
+ use std::io::{Read, Write};
+
+ use flate2::{read, write, Compression};
+
+ use crate::compression::Codec;
+ use crate::errors::Result;
-impl Codec for GZipCodec {
- fn decompress(
- &mut self,
- input_buf: &[u8],
- output_buf: &mut Vec<u8>,
- ) -> Result<usize> {
- let mut decoder = read::GzDecoder::new(input_buf);
- decoder.read_to_end(output_buf).map_err(|e| e.into())
+ /// Codec for GZIP compression algorithm.
+ pub struct GZipCodec {}
+
+ impl GZipCodec {
+ /// Creates new GZIP compression codec.
+ pub(crate) fn new() -> Self {
+ Self {}
+ }
}
- fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) ->
Result<()> {
- let mut encoder = write::GzEncoder::new(output_buf,
Compression::default());
- encoder.write_all(input_buf)?;
- encoder.try_finish().map_err(|e| e.into())
+ impl Codec for GZipCodec {
+ fn decompress(
+ &mut self,
+ input_buf: &[u8],
+ output_buf: &mut Vec<u8>,
+ ) -> Result<usize> {
+ let mut decoder = read::GzDecoder::new(input_buf);
+ decoder.read_to_end(output_buf).map_err(|e| e.into())
+ }
+
+ fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) ->
Result<()> {
+ let mut encoder = write::GzEncoder::new(output_buf,
Compression::default());
+ encoder.write_all(input_buf)?;
+ encoder.try_finish().map_err(|e| e.into())
+ }
}
}
+#[cfg(any(feature = "flate2", test))]
+pub use gzip_codec::*;
-const BROTLI_DEFAULT_BUFFER_SIZE: usize = 4096;
-const BROTLI_DEFAULT_COMPRESSION_QUALITY: u32 = 1; // supported levels 0-9
-const BROTLI_DEFAULT_LG_WINDOW_SIZE: u32 = 22; // recommended between 20-22
+#[cfg(any(feature = "brotli", test))]
+mod brotli_codec {
-/// Codec for Brotli compression algorithm.
-pub struct BrotliCodec {}
+ use std::io::{Read, Write};
-impl BrotliCodec {
- /// Creates new Brotli compression codec.
- fn new() -> Self {
- Self {}
- }
-}
+ use crate::compression::Codec;
+ use crate::errors::Result;
+
+ const BROTLI_DEFAULT_BUFFER_SIZE: usize = 4096;
+ const BROTLI_DEFAULT_COMPRESSION_QUALITY: u32 = 1; // supported levels 0-9
+ const BROTLI_DEFAULT_LG_WINDOW_SIZE: u32 = 22; // recommended between 20-22
-impl Codec for BrotliCodec {
- fn decompress(
- &mut self,
- input_buf: &[u8],
- output_buf: &mut Vec<u8>,
- ) -> Result<usize> {
- brotli::Decompressor::new(input_buf, BROTLI_DEFAULT_BUFFER_SIZE)
- .read_to_end(output_buf)
- .map_err(|e| e.into())
+ /// Codec for Brotli compression algorithm.
+ pub struct BrotliCodec {}
+
+ impl BrotliCodec {
+ /// Creates new Brotli compression codec.
+ pub(crate) fn new() -> Self {
+ Self {}
+ }
}
- fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) ->
Result<()> {
- let mut encoder = brotli::CompressorWriter::new(
- output_buf,
- BROTLI_DEFAULT_BUFFER_SIZE,
- BROTLI_DEFAULT_COMPRESSION_QUALITY,
- BROTLI_DEFAULT_LG_WINDOW_SIZE,
- );
- encoder.write_all(&input_buf[..])?;
- encoder.flush().map_err(|e| e.into())
+ impl Codec for BrotliCodec {
+ fn decompress(
+ &mut self,
+ input_buf: &[u8],
+ output_buf: &mut Vec<u8>,
+ ) -> Result<usize> {
+ brotli::Decompressor::new(input_buf, BROTLI_DEFAULT_BUFFER_SIZE)
+ .read_to_end(output_buf)
+ .map_err(|e| e.into())
+ }
+
+ fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) ->
Result<()> {
+ let mut encoder = brotli::CompressorWriter::new(
+ output_buf,
+ BROTLI_DEFAULT_BUFFER_SIZE,
+ BROTLI_DEFAULT_COMPRESSION_QUALITY,
+ BROTLI_DEFAULT_LG_WINDOW_SIZE,
+ );
+ encoder.write_all(&input_buf[..])?;
+ encoder.flush().map_err(|e| e.into())
+ }
}
}
+#[cfg(any(feature = "brotli", test))]
+pub use brotli_codec::*;
+
+#[cfg(any(feature = "lz4", test))]
+mod lz4_codec {
+ use std::io::{Read, Write};
+
+ use crate::compression::Codec;
+ use crate::errors::Result;
-const LZ4_BUFFER_SIZE: usize = 4096;
+ const LZ4_BUFFER_SIZE: usize = 4096;
-/// Codec for LZ4 compression algorithm.
-pub struct LZ4Codec {}
+ /// Codec for LZ4 compression algorithm.
+ pub struct LZ4Codec {}
-impl LZ4Codec {
- /// Creates new LZ4 compression codec.
- fn new() -> Self {
- Self {}
+ impl LZ4Codec {
+ /// Creates new LZ4 compression codec.
+ pub(crate) fn new() -> Self {
+ Self {}
+ }
}
-}
-impl Codec for LZ4Codec {
- fn decompress(
- &mut self,
- input_buf: &[u8],
- output_buf: &mut Vec<u8>,
- ) -> Result<usize> {
- let mut decoder = lz4::Decoder::new(input_buf)?;
- let mut buffer: [u8; LZ4_BUFFER_SIZE] = [0; LZ4_BUFFER_SIZE];
- let mut total_len = 0;
- loop {
- let len = decoder.read(&mut buffer)?;
- if len == 0 {
- break;
+ impl Codec for LZ4Codec {
+ fn decompress(
+ &mut self,
+ input_buf: &[u8],
+ output_buf: &mut Vec<u8>,
+ ) -> Result<usize> {
+ let mut decoder = lz4::Decoder::new(input_buf)?;
+ let mut buffer: [u8; LZ4_BUFFER_SIZE] = [0; LZ4_BUFFER_SIZE];
+ let mut total_len = 0;
+ loop {
+ let len = decoder.read(&mut buffer)?;
+ if len == 0 {
+ break;
+ }
+ total_len += len;
+ output_buf.write_all(&buffer[0..len])?;
}
- total_len += len;
- output_buf.write_all(&buffer[0..len])?;
+ Ok(total_len)
}
- Ok(total_len)
- }
- fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) ->
Result<()> {
- let mut encoder = lz4::EncoderBuilder::new().build(output_buf)?;
- let mut from = 0;
- loop {
- let to = std::cmp::min(from + LZ4_BUFFER_SIZE, input_buf.len());
- encoder.write_all(&input_buf[from..to])?;
- from += LZ4_BUFFER_SIZE;
- if from >= input_buf.len() {
- break;
+ fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) ->
Result<()> {
+ let mut encoder = lz4::EncoderBuilder::new().build(output_buf)?;
+ let mut from = 0;
+ loop {
+ let to = std::cmp::min(from + LZ4_BUFFER_SIZE,
input_buf.len());
+ encoder.write_all(&input_buf[from..to])?;
+ from += LZ4_BUFFER_SIZE;
+ if from >= input_buf.len() {
+ break;
+ }
}
+ encoder.finish().1.map_err(|e| e.into())
}
- encoder.finish().1.map_err(|e| e.into())
}
}
+#[cfg(any(feature = "lz4", test))]
+pub use lz4_codec::*;
-/// Codec for Zstandard compression algorithm.
-pub struct ZSTDCodec {}
+#[cfg(any(feature = "zstd", test))]
+mod zstd_codec {
+ use std::io::{self, Write};
-impl ZSTDCodec {
- /// Creates new Zstandard compression codec.
- fn new() -> Self {
- Self {}
- }
-}
+ use crate::compression::Codec;
+ use crate::errors::Result;
-/// Compression level (1-21) for ZSTD. Choose 1 here for better compression
speed.
-const ZSTD_COMPRESSION_LEVEL: i32 = 1;
-
-impl Codec for ZSTDCodec {
- fn decompress(
- &mut self,
- input_buf: &[u8],
- output_buf: &mut Vec<u8>,
- ) -> Result<usize> {
- let mut decoder = zstd::Decoder::new(input_buf)?;
- match io::copy(&mut decoder, output_buf) {
- Ok(n) => Ok(n as usize),
- Err(e) => Err(e.into()),
+ /// Codec for Zstandard compression algorithm.
+ pub struct ZSTDCodec {}
+
+ impl ZSTDCodec {
+ /// Creates new Zstandard compression codec.
+ pub(crate) fn new() -> Self {
+ Self {}
}
}
- fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) ->
Result<()> {
- let mut encoder = zstd::Encoder::new(output_buf,
ZSTD_COMPRESSION_LEVEL)?;
- encoder.write_all(&input_buf[..])?;
- match encoder.finish() {
- Ok(_) => Ok(()),
- Err(e) => Err(e.into()),
+ /// Compression level (1-21) for ZSTD. Choose 1 here for better
compression speed.
+ const ZSTD_COMPRESSION_LEVEL: i32 = 1;
+
+ impl Codec for ZSTDCodec {
+ fn decompress(
+ &mut self,
+ input_buf: &[u8],
+ output_buf: &mut Vec<u8>,
+ ) -> Result<usize> {
+ let mut decoder = zstd::Decoder::new(input_buf)?;
+ match io::copy(&mut decoder, output_buf) {
+ Ok(n) => Ok(n as usize),
+ Err(e) => Err(e.into()),
+ }
+ }
+
+ fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) ->
Result<()> {
+ let mut encoder = zstd::Encoder::new(output_buf,
ZSTD_COMPRESSION_LEVEL)?;
+ encoder.write_all(&input_buf[..])?;
+ match encoder.finish() {
+ Ok(_) => Ok(()),
+ Err(e) => Err(e.into()),
+ }
}
}
}
+#[cfg(any(feature = "zstd", test))]
+pub use zstd_codec::*;
#[cfg(test)]
mod tests {
diff --git a/rust/parquet/src/errors.rs b/rust/parquet/src/errors.rs
index e532834..18673a4 100644
--- a/rust/parquet/src/errors.rs
+++ b/rust/parquet/src/errors.rs
@@ -19,46 +19,87 @@
use std::{cell, convert, io, result, str};
+#[cfg(any(feature = "arrow", test))]
use arrow::error::ArrowError;
-use quick_error::quick_error;
-use snap;
-use thrift;
-
-quick_error! {
- /// Set of errors that can be produced during different operations in
Parquet.
- #[derive(Debug, PartialEq)]
- pub enum ParquetError {
- /// General Parquet error.
- /// Returned when code violates normal workflow of working with Parquet
files.
- General(message: String) {
- display("Parquet error: {}", message)
- from(e: io::Error) -> (format!("underlying IO error: {}", e))
- from(e: snap::Error) -> (format!("underlying snap error: {}", e))
- from(e: thrift::Error) -> (format!("underlying Thrift error:
{}", e))
- from(e: cell::BorrowMutError) -> (format!("underlying borrow
error: {}", e))
- from(e: str::Utf8Error) -> (format!("underlying utf8 error: {}",
e))
- }
- /// "Not yet implemented" Parquet error.
- /// Returned when functionality is not yet available.
- NYI(message: String) {
- display("NYI: {}", message)
- }
- /// "End of file" Parquet error.
- /// Returned when IO related failures occur, e.g. when there are not
enough bytes to
- /// decode.
- EOF(message: String) {
- display("EOF: {}", message)
- }
- /// Arrow error.
- /// Returned when reading into arrow or writing from arrow.
- ArrowError(message: String) {
- display("Arrow: {}", message)
- from(e: ArrowError) -> (format!("underlying Arrow error: {:?}",
e))
- }
- IndexOutOfBound(index: usize, bound: usize) {
- display("Index {} out of bound: {}", index, bound)
- }
- }
+
+#[derive(Debug, PartialEq)]
+pub enum ParquetError {
+ /// General Parquet error.
+ /// Returned when code violates normal workflow of working with Parquet
files.
+ General(String),
+ /// "Not yet implemented" Parquet error.
+ /// Returned when functionality is not yet available.
+ NYI(String),
+ /// "End of file" Parquet error.
+ /// Returned when IO related failures occur, e.g. when there are not
enough bytes to
+ /// decode.
+ EOF(String),
+ #[cfg(any(feature = "arrow", test))]
+ /// Arrow error.
+ /// Returned when reading into arrow or writing from arrow.
+ ArrowError(String),
+ IndexOutOfBound(usize, usize),
+}
+
+impl std::fmt::Display for ParquetError {
+ fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
+ match *self {
+ ParquetError::General(ref message) => {
+ write!(fmt, "Parquet error: {}", message)
+ }
+ ParquetError::NYI(ref message) => write!(fmt, "NYI: {}", message),
+ ParquetError::EOF(ref message) => write!(fmt, "EOF: {}", message),
+ #[cfg(any(feature = "arrow", test))]
+ ParquetError::ArrowError(ref message) => write!(fmt, "Arrow: {}",
message),
+ ParquetError::IndexOutOfBound(ref index, ref bound) => {
+ write!(fmt, "Index {} out of bound: {}", index, bound)
+ }
+ }
+ }
+}
+
+impl std::error::Error for ParquetError {
+ fn cause(&self) -> Option<&::std::error::Error> {
+ None
+ }
+}
+
+impl From<io::Error> for ParquetError {
+ fn from(e: io::Error) -> ParquetError {
+ ParquetError::General(format!("underlying IO error: {}", e))
+ }
+}
+
+#[cfg(any(feature = "snap", test))]
+impl From<snap::Error> for ParquetError {
+ fn from(e: snap::Error) -> ParquetError {
+ ParquetError::General(format!("underlying snap error: {}", e))
+ }
+}
+
+impl From<thrift::Error> for ParquetError {
+ fn from(e: thrift::Error) -> ParquetError {
+ ParquetError::General(format!("underlying Thrift error: {}", e))
+ }
+}
+
+impl From<cell::BorrowMutError> for ParquetError {
+ fn from(e: cell::BorrowMutError) -> ParquetError {
+ ParquetError::General(format!("underlying borrow error: {}", e))
+ }
+}
+
+impl From<str::Utf8Error> for ParquetError {
+ fn from(e: str::Utf8Error) -> ParquetError {
+ ParquetError::General(format!("underlying utf8 error: {}", e))
+ }
+}
+
+#[cfg(any(feature = "arrow", test))]
+impl From<ArrowError> for ParquetError {
+ fn from(e: ArrowError) -> ParquetError {
+ ParquetError::ArrowError(format!("underlying Arrow error: {}", e))
+ }
}
/// A specialized `Result` for Parquet errors.
@@ -97,6 +138,7 @@ macro_rules! eof_err {
// ----------------------------------------------------------------------
// Convert parquet error into other errors
+#[cfg(any(feature = "arrow", test))]
impl Into<ArrowError> for ParquetError {
fn into(self) -> ArrowError {
ArrowError::ParquetError(format!("{}", self))
diff --git a/rust/parquet/src/lib.rs b/rust/parquet/src/lib.rs
index 8d23e89..6d5a6b3 100644
--- a/rust/parquet/src/lib.rs
+++ b/rust/parquet/src/lib.rs
@@ -31,6 +31,7 @@ pub use self::util::memory;
#[macro_use]
mod util;
+#[cfg(any(feature = "arrow", test))]
pub mod arrow;
pub mod column;
pub mod compression;