This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 4222f5aa4 Remove fallibility from RLEEncoder (#2226) (#2259)
4222f5aa4 is described below
commit 4222f5aa40c3d0c0821fc61c1c4091176ebb70f2
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Tue Aug 2 14:13:38 2022 +0100
Remove fallibility from RLEEncoder (#2226) (#2259)
---
parquet/src/arrow/arrow_writer/byte_array.rs | 14 ++---
.../src/arrow/record_reader/definition_levels.rs | 8 +--
parquet/src/column/writer/mod.rs | 16 ++---
parquet/src/encodings/encoding/dict_encoder.rs | 8 +--
parquet/src/encodings/encoding/mod.rs | 6 +-
parquet/src/encodings/levels.rs | 35 +++++------
parquet/src/encodings/rle.rs | 71 +++++++++-------------
parquet/src/util/test_common/page_util.rs | 4 +-
8 files changed, 70 insertions(+), 92 deletions(-)
diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs
b/parquet/src/arrow/arrow_writer/byte_array.rs
index 52698a31b..d1a0da5b3 100644
--- a/parquet/src/arrow/arrow_writer/byte_array.rs
+++ b/parquet/src/arrow/arrow_writer/byte_array.rs
@@ -374,7 +374,7 @@ impl DictEncoder {
&mut self,
min_value: Option<ByteArray>,
max_value: Option<ByteArray>,
- ) -> Result<DataPageValues<ByteArray>> {
+ ) -> DataPageValues<ByteArray> {
let num_values = self.indices.len();
let buffer_len = self.estimated_data_page_size();
let mut buffer = Vec::with_capacity(buffer_len);
@@ -382,20 +382,18 @@ impl DictEncoder {
let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer);
for index in &self.indices {
- if !encoder.put(*index as u64)? {
- return Err(general_err!("Encoder doesn't have enough space"));
- }
+ encoder.put(*index as u64)
}
self.indices.clear();
- Ok(DataPageValues {
- buf: encoder.consume()?.into(),
+ DataPageValues {
+ buf: encoder.consume().into(),
num_values,
encoding: Encoding::RLE_DICTIONARY,
min_value,
max_value,
- })
+ }
}
}
@@ -500,7 +498,7 @@ impl ColumnValueEncoder for ByteArrayEncoder {
let max_value = self.max_value.take();
match &mut self.dict_encoder {
- Some(encoder) => encoder.flush_data_page(min_value, max_value),
+ Some(encoder) => Ok(encoder.flush_data_page(min_value, max_value)),
_ => self.fallback.flush_data_page(min_value, max_value),
}
}
diff --git a/parquet/src/arrow/record_reader/definition_levels.rs
b/parquet/src/arrow/record_reader/definition_levels.rs
index 53eeab9a5..2d65db77f 100644
--- a/parquet/src/arrow/record_reader/definition_levels.rs
+++ b/parquet/src/arrow/record_reader/definition_levels.rs
@@ -408,12 +408,12 @@ mod tests {
let mut encoder = RleEncoder::new(1, 1024);
for _ in 0..len {
let bool = rng.gen_bool(0.8);
- assert!(encoder.put(bool as u64).unwrap());
+ encoder.put(bool as u64);
expected.append(bool);
}
assert_eq!(expected.len(), len);
- let encoded = encoder.consume().unwrap();
+ let encoded = encoder.consume();
let mut decoder = PackedDecoder::new();
decoder.set_data(Encoding::RLE, ByteBufferPtr::new(encoded));
@@ -444,7 +444,7 @@ mod tests {
let mut total_value = 0;
for _ in 0..len {
let bool = rng.gen_bool(0.8);
- assert!(encoder.put(bool as u64).unwrap());
+ encoder.put(bool as u64);
expected.append(bool);
if bool {
total_value += 1;
@@ -452,7 +452,7 @@ mod tests {
}
assert_eq!(expected.len(), len);
- let encoded = encoder.consume().unwrap();
+ let encoded = encoder.consume();
let mut decoder = PackedDecoder::new();
decoder.set_data(Encoding::RLE, ByteBufferPtr::new(encoded));
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 6c467b5e4..ce773c19d 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -630,7 +630,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
Encoding::RLE,
&self.rep_levels_sink[..],
max_rep_level,
- )?[..],
+ )[..],
);
}
@@ -640,7 +640,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
Encoding::RLE,
&self.def_levels_sink[..],
max_def_level,
- )?[..],
+ )[..],
);
}
@@ -671,14 +671,14 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a,
E> {
if max_rep_level > 0 {
let levels =
- self.encode_levels_v2(&self.rep_levels_sink[..],
max_rep_level)?;
+ self.encode_levels_v2(&self.rep_levels_sink[..],
max_rep_level);
rep_levels_byte_len = levels.len();
buffer.extend_from_slice(&levels[..]);
}
if max_def_level > 0 {
let levels =
- self.encode_levels_v2(&self.def_levels_sink[..],
max_def_level)?;
+ self.encode_levels_v2(&self.def_levels_sink[..],
max_def_level);
def_levels_byte_len = levels.len();
buffer.extend_from_slice(&levels[..]);
}
@@ -794,18 +794,18 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a,
E> {
encoding: Encoding,
levels: &[i16],
max_level: i16,
- ) -> Result<Vec<u8>> {
+ ) -> Vec<u8> {
let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len());
- encoder.put(levels)?;
+ encoder.put(levels);
encoder.consume()
}
/// Encodes definition or repetition levels for Data Page v2.
/// Encoding is always RLE.
#[inline]
- fn encode_levels_v2(&self, levels: &[i16], max_level: i16) ->
Result<Vec<u8>> {
+ fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Vec<u8> {
let mut encoder = LevelEncoder::v2(max_level, levels.len());
- encoder.put(levels)?;
+ encoder.put(levels);
encoder.consume()
}
diff --git a/parquet/src/encodings/encoding/dict_encoder.rs
b/parquet/src/encodings/encoding/dict_encoder.rs
index 8f3c98aca..a7855cc84 100644
--- a/parquet/src/encodings/encoding/dict_encoder.rs
+++ b/parquet/src/encodings/encoding/dict_encoder.rs
@@ -23,7 +23,7 @@ use crate::data_type::private::ParquetValueType;
use crate::data_type::DataType;
use crate::encodings::encoding::{Encoder, PlainEncoder};
use crate::encodings::rle::RleEncoder;
-use crate::errors::{ParquetError, Result};
+use crate::errors::Result;
use crate::schema::types::ColumnDescPtr;
use crate::util::bit_util::num_required_bits;
use crate::util::interner::{Interner, Storage};
@@ -132,12 +132,10 @@ impl<T: DataType> DictEncoder<T> {
// Write bit width in the first byte
let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer);
for index in &self.indices {
- if !encoder.put(*index as u64)? {
- return Err(general_err!("Encoder doesn't have enough space"));
- }
+ encoder.put(*index as u64)
}
self.indices.clear();
- Ok(ByteBufferPtr::new(encoder.consume()?))
+ Ok(ByteBufferPtr::new(encoder.consume()))
}
fn put_one(&mut self, value: &T::T) {
diff --git a/parquet/src/encodings/encoding/mod.rs
b/parquet/src/encodings/encoding/mod.rs
index 383211f12..b0c8fa10f 100644
--- a/parquet/src/encodings/encoding/mod.rs
+++ b/parquet/src/encodings/encoding/mod.rs
@@ -195,9 +195,7 @@ impl<T: DataType> Encoder<T> for RleValueEncoder<T> {
for value in values {
let value = value.as_u64()?;
- if !rle_encoder.put(value)? {
- return Err(general_err!("RLE buffer is full"));
- }
+ rle_encoder.put(value)
}
Ok(())
}
@@ -227,7 +225,7 @@ impl<T: DataType> Encoder<T> for RleValueEncoder<T> {
.expect("RLE value encoder is not initialized");
// Flush all encoder buffers and raw values
- let mut buf = rle_encoder.consume()?;
+ let mut buf = rle_encoder.consume();
assert!(buf.len() > 4, "should have had padding inserted");
// Note that buf does not have any offset, all data is encoded bytes
diff --git a/parquet/src/encodings/levels.rs b/parquet/src/encodings/levels.rs
index 8bdfdd3e9..62c68d843 100644
--- a/parquet/src/encodings/levels.rs
+++ b/parquet/src/encodings/levels.rs
@@ -21,7 +21,7 @@ use super::rle::{RleDecoder, RleEncoder};
use crate::basic::Encoding;
use crate::data_type::AsBytes;
-use crate::errors::{ParquetError, Result};
+use crate::errors::Result;
use crate::util::{
bit_util::{ceil, num_required_bits, BitReader, BitWriter},
memory::ByteBufferPtr,
@@ -97,21 +97,16 @@ impl LevelEncoder {
/// Put/encode levels vector into this level encoder.
/// Returns number of encoded values that are less than or equal to length
of the
/// input buffer.
- ///
- /// RLE and BIT_PACKED level encoders return Err() when internal buffer
overflows or
- /// flush fails.
#[inline]
- pub fn put(&mut self, buffer: &[i16]) -> Result<usize> {
+ pub fn put(&mut self, buffer: &[i16]) -> usize {
let mut num_encoded = 0;
match *self {
LevelEncoder::Rle(ref mut encoder) | LevelEncoder::RleV2(ref mut
encoder) => {
for value in buffer {
- if !encoder.put(*value as u64)? {
- return Err(general_err!("RLE buffer is full"));
- }
+ encoder.put(*value as u64);
num_encoded += 1;
}
- encoder.flush()?;
+ encoder.flush();
}
LevelEncoder::BitPacked(bit_width, ref mut encoder) => {
for value in buffer {
@@ -121,25 +116,25 @@ impl LevelEncoder {
encoder.flush();
}
}
- Ok(num_encoded)
+ num_encoded
}
/// Finalizes level encoder, flush all intermediate buffers and return
resulting
/// encoded buffer. Returned buffer is already truncated to encoded bytes
only.
#[inline]
- pub fn consume(self) -> Result<Vec<u8>> {
+ pub fn consume(self) -> Vec<u8> {
match self {
LevelEncoder::Rle(encoder) => {
- let mut encoded_data = encoder.consume()?;
+ let mut encoded_data = encoder.consume();
// Account for the buffer offset
let encoded_len = encoded_data.len() - mem::size_of::<i32>();
let len = (encoded_len as i32).to_le();
let len_bytes = len.as_bytes();
encoded_data[0..len_bytes.len()].copy_from_slice(len_bytes);
- Ok(encoded_data)
+ encoded_data
}
LevelEncoder::RleV2(encoder) => encoder.consume(),
- LevelEncoder::BitPacked(_, encoder) => Ok(encoder.consume()),
+ LevelEncoder::BitPacked(_, encoder) => encoder.consume(),
}
}
}
@@ -287,8 +282,8 @@ mod tests {
} else {
LevelEncoder::v1(enc, max_level, levels.len())
};
- encoder.put(levels).expect("put() should be OK");
- let encoded_levels = encoder.consume().expect("consume() should be
OK");
+ encoder.put(levels);
+ let encoded_levels = encoder.consume();
let byte_buf = ByteBufferPtr::new(encoded_levels);
let mut decoder;
@@ -318,8 +313,8 @@ mod tests {
} else {
LevelEncoder::v1(enc, max_level, levels.len())
};
- encoder.put(levels).expect("put() should be OK");
- let encoded_levels = encoder.consume().expect("consume() should be
OK");
+ encoder.put(levels);
+ let encoded_levels = encoder.consume();
let byte_buf = ByteBufferPtr::new(encoded_levels);
let mut decoder;
@@ -366,8 +361,8 @@ mod tests {
LevelEncoder::v1(enc, max_level, levels.len())
};
// Encode only one value
- let num_encoded = encoder.put(&levels[0..1]).expect("put() should be
OK");
- let encoded_levels = encoder.consume().expect("consume() should be
OK");
+ let num_encoded = encoder.put(&levels[0..1]);
+ let encoded_levels = encoder.consume();
assert_eq!(num_encoded, 1);
let byte_buf = ByteBufferPtr::new(encoded_levels);
diff --git a/parquet/src/encodings/rle.rs b/parquet/src/encodings/rle.rs
index 28ebd7d3a..aad833e0e 100644
--- a/parquet/src/encodings/rle.rs
+++ b/parquet/src/encodings/rle.rs
@@ -132,23 +132,21 @@ impl RleEncoder {
}
/// Encodes `value`, which must be representable with `bit_width` bits.
- /// Returns true if the value fits in buffer, false if it doesn't, or
- /// error if something is wrong.
#[inline]
- pub fn put(&mut self, value: u64) -> Result<bool> {
+ pub fn put(&mut self, value: u64) {
// This function buffers 8 values at a time. After seeing 8 values, it
// decides whether the current run should be encoded in bit-packed or
RLE.
if self.current_value == value {
self.repeat_count += 1;
if self.repeat_count > 8 {
// A continuation of last value. No need to buffer.
- return Ok(true);
+ return;
}
} else {
if self.repeat_count >= 8 {
// The current RLE run has ended and we've gathered enough.
Flush first.
assert_eq!(self.bit_packed_count, 0);
- self.flush_rle_run()?;
+ self.flush_rle_run();
}
self.repeat_count = 1;
self.current_value = value;
@@ -159,10 +157,8 @@ impl RleEncoder {
if self.num_buffered_values == 8 {
// Buffered values are full. Flush them.
assert_eq!(self.bit_packed_count % 8, 0);
- self.flush_buffered_values()?;
+ self.flush_buffered_values();
}
-
- Ok(true)
}
#[inline]
@@ -180,17 +176,17 @@ impl RleEncoder {
}
#[inline]
- pub fn consume(mut self) -> Result<Vec<u8>> {
- self.flush()?;
- Ok(self.bit_writer.consume())
+ pub fn consume(mut self) -> Vec<u8> {
+ self.flush();
+ self.bit_writer.consume()
}
/// Borrow equivalent of the `consume` method.
/// Call `clear()` after invoking this method.
#[inline]
- pub fn flush_buffer(&mut self) -> Result<&[u8]> {
- self.flush()?;
- Ok(self.bit_writer.flush_buffer())
+ pub fn flush_buffer(&mut self) -> &[u8] {
+ self.flush();
+ self.bit_writer.flush_buffer()
}
/// Clears the internal state so this encoder can be reused (e.g., after
becoming
@@ -208,7 +204,7 @@ impl RleEncoder {
/// Flushes all remaining values and return the final byte buffer
maintained by the
/// internal writer.
#[inline]
- pub fn flush(&mut self) -> Result<()> {
+ pub fn flush(&mut self) {
if self.bit_packed_count > 0
|| self.repeat_count > 0
|| self.num_buffered_values > 0
@@ -217,7 +213,7 @@ impl RleEncoder {
&& (self.repeat_count == self.num_buffered_values
|| self.num_buffered_values == 0);
if self.repeat_count > 0 && all_repeat {
- self.flush_rle_run()?;
+ self.flush_rle_run();
} else {
// Buffer the last group of bit-packed values to 8 by padding
with 0s.
if self.num_buffered_values > 0 {
@@ -227,14 +223,13 @@ impl RleEncoder {
}
}
self.bit_packed_count += self.num_buffered_values;
- self.flush_bit_packed_run(true)?;
+ self.flush_bit_packed_run(true);
self.repeat_count = 0;
}
}
- Ok(())
}
- fn flush_rle_run(&mut self) -> Result<()> {
+ fn flush_rle_run(&mut self) {
assert!(self.repeat_count > 0);
let indicator_value = self.repeat_count << 1;
self.bit_writer.put_vlq_int(indicator_value as u64);
@@ -244,10 +239,9 @@ impl RleEncoder {
);
self.num_buffered_values = 0;
self.repeat_count = 0;
- Ok(())
}
- fn flush_bit_packed_run(&mut self, update_indicator_byte: bool) ->
Result<()> {
+ fn flush_bit_packed_run(&mut self, update_indicator_byte: bool) {
if self.indicator_byte_pos < 0 {
self.indicator_byte_pos = self.bit_writer.skip(1) as i64;
}
@@ -270,20 +264,19 @@ impl RleEncoder {
self.indicator_byte_pos = -1;
self.bit_packed_count = 0;
}
- Ok(())
}
#[inline(never)]
- fn flush_buffered_values(&mut self) -> Result<()> {
+ fn flush_buffered_values(&mut self) {
if self.repeat_count >= 8 {
self.num_buffered_values = 0;
if self.bit_packed_count > 0 {
// In this case we choose RLE encoding. Flush the current
buffered values
// as bit-packed encoding.
assert_eq!(self.bit_packed_count % 8, 0);
- self.flush_bit_packed_run(true)?
+ self.flush_bit_packed_run(true)
}
- return Ok(());
+ return;
}
self.bit_packed_count += self.num_buffered_values;
@@ -292,12 +285,11 @@ impl RleEncoder {
// We've reached the maximum value that can be hold in a single
bit-packed
// run.
assert!(self.indicator_byte_pos >= 0);
- self.flush_bit_packed_run(true)?;
+ self.flush_bit_packed_run(true);
} else {
- self.flush_bit_packed_run(false)?;
+ self.flush_bit_packed_run(false);
}
self.repeat_count = 0;
- Ok(())
}
}
@@ -585,11 +577,11 @@ mod tests {
let mut encoder1 = RleEncoder::new(3, 256);
let mut encoder2 = RleEncoder::new(3, 256);
for value in data {
- encoder1.put(value as u64).unwrap();
- encoder2.put(value as u64).unwrap();
+ encoder1.put(value as u64);
+ encoder2.put(value as u64);
}
- let res1 = encoder1.flush_buffer().unwrap();
- let res2 = encoder2.consume().unwrap();
+ let res1 = encoder1.flush_buffer();
+ let res2 = encoder2.consume();
assert_eq!(res1, &res2[..]);
}
@@ -763,10 +755,9 @@ mod tests {
let buffer_len = 64 * 1024;
let mut encoder = RleEncoder::new(bit_width, buffer_len);
for v in values {
- let result = encoder.put(*v as u64);
- assert!(result.is_ok());
+ encoder.put(*v as u64)
}
- let buffer = ByteBufferPtr::new(encoder.consume().expect("Expect
consume() OK"));
+ let buffer = ByteBufferPtr::new(encoder.consume());
if expected_len != -1 {
assert_eq!(buffer.len(), expected_len as usize);
}
@@ -919,9 +910,9 @@ mod tests {
let values: Vec<i16> = vec![0, 1, 1, 1, 1, 0, 0, 0, 0, 1];
let mut encoder = RleEncoder::new(bit_width, buffer_len);
for v in &values {
- assert!(encoder.put(*v as u64).expect("put() should be OK"));
+ encoder.put(*v as u64)
}
- let buffer = encoder.consume().expect("consume() should be OK");
+ let buffer = encoder.consume();
let mut decoder = RleDecoder::new(bit_width);
decoder.set_data(ByteBufferPtr::new(buffer));
let mut actual_values: Vec<i16> = vec![0; values.len()];
@@ -935,12 +926,10 @@ mod tests {
let buffer_len = 64 * 1024;
let mut encoder = RleEncoder::new(bit_width, buffer_len);
for v in values {
- let result = encoder.put(*v as u64).expect("put() should be OK");
- assert!(result, "put() should not return false");
+ encoder.put(*v as u64)
}
- let buffer =
- ByteBufferPtr::new(encoder.consume().expect("consume() should be
OK"));
+ let buffer = ByteBufferPtr::new(encoder.consume());
// Verify read
let mut decoder = RleDecoder::new(bit_width);
diff --git a/parquet/src/util/test_common/page_util.rs
b/parquet/src/util/test_common/page_util.rs
index d7653d4e5..bc197d00e 100644
--- a/parquet/src/util/test_common/page_util.rs
+++ b/parquet/src/util/test_common/page_util.rs
@@ -75,8 +75,8 @@ impl DataPageBuilderImpl {
return 0;
}
let mut level_encoder = LevelEncoder::v1(Encoding::RLE, max_level,
levels.len());
- level_encoder.put(levels).expect("put() should be OK");
- let encoded_levels = level_encoder.consume().expect("consume() should
be OK");
+ level_encoder.put(levels);
+ let encoded_levels = level_encoder.consume();
// Actual encoded bytes (without length offset)
let encoded_bytes = &encoded_levels[mem::size_of::<i32>()..];
if self.datapage_v2 {