This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 4222f5aa4 Remove fallibility from RLEEncoder (#2226) (#2259)
4222f5aa4 is described below

commit 4222f5aa40c3d0c0821fc61c1c4091176ebb70f2
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Tue Aug 2 14:13:38 2022 +0100

    Remove fallibility from RLEEncoder (#2226) (#2259)
---
 parquet/src/arrow/arrow_writer/byte_array.rs       | 14 ++---
 .../src/arrow/record_reader/definition_levels.rs   |  8 +--
 parquet/src/column/writer/mod.rs                   | 16 ++---
 parquet/src/encodings/encoding/dict_encoder.rs     |  8 +--
 parquet/src/encodings/encoding/mod.rs              |  6 +-
 parquet/src/encodings/levels.rs                    | 35 +++++------
 parquet/src/encodings/rle.rs                       | 71 +++++++++-------------
 parquet/src/util/test_common/page_util.rs          |  4 +-
 8 files changed, 70 insertions(+), 92 deletions(-)

diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs 
b/parquet/src/arrow/arrow_writer/byte_array.rs
index 52698a31b..d1a0da5b3 100644
--- a/parquet/src/arrow/arrow_writer/byte_array.rs
+++ b/parquet/src/arrow/arrow_writer/byte_array.rs
@@ -374,7 +374,7 @@ impl DictEncoder {
         &mut self,
         min_value: Option<ByteArray>,
         max_value: Option<ByteArray>,
-    ) -> Result<DataPageValues<ByteArray>> {
+    ) -> DataPageValues<ByteArray> {
         let num_values = self.indices.len();
         let buffer_len = self.estimated_data_page_size();
         let mut buffer = Vec::with_capacity(buffer_len);
@@ -382,20 +382,18 @@ impl DictEncoder {
 
         let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer);
         for index in &self.indices {
-            if !encoder.put(*index as u64)? {
-                return Err(general_err!("Encoder doesn't have enough space"));
-            }
+            encoder.put(*index as u64)
         }
 
         self.indices.clear();
 
-        Ok(DataPageValues {
-            buf: encoder.consume()?.into(),
+        DataPageValues {
+            buf: encoder.consume().into(),
             num_values,
             encoding: Encoding::RLE_DICTIONARY,
             min_value,
             max_value,
-        })
+        }
     }
 }
 
@@ -500,7 +498,7 @@ impl ColumnValueEncoder for ByteArrayEncoder {
         let max_value = self.max_value.take();
 
         match &mut self.dict_encoder {
-            Some(encoder) => encoder.flush_data_page(min_value, max_value),
+            Some(encoder) => Ok(encoder.flush_data_page(min_value, max_value)),
             _ => self.fallback.flush_data_page(min_value, max_value),
         }
     }
diff --git a/parquet/src/arrow/record_reader/definition_levels.rs 
b/parquet/src/arrow/record_reader/definition_levels.rs
index 53eeab9a5..2d65db77f 100644
--- a/parquet/src/arrow/record_reader/definition_levels.rs
+++ b/parquet/src/arrow/record_reader/definition_levels.rs
@@ -408,12 +408,12 @@ mod tests {
         let mut encoder = RleEncoder::new(1, 1024);
         for _ in 0..len {
             let bool = rng.gen_bool(0.8);
-            assert!(encoder.put(bool as u64).unwrap());
+            encoder.put(bool as u64);
             expected.append(bool);
         }
         assert_eq!(expected.len(), len);
 
-        let encoded = encoder.consume().unwrap();
+        let encoded = encoder.consume();
         let mut decoder = PackedDecoder::new();
         decoder.set_data(Encoding::RLE, ByteBufferPtr::new(encoded));
 
@@ -444,7 +444,7 @@ mod tests {
         let mut total_value = 0;
         for _ in 0..len {
             let bool = rng.gen_bool(0.8);
-            assert!(encoder.put(bool as u64).unwrap());
+            encoder.put(bool as u64);
             expected.append(bool);
             if bool {
                 total_value += 1;
@@ -452,7 +452,7 @@ mod tests {
         }
         assert_eq!(expected.len(), len);
 
-        let encoded = encoder.consume().unwrap();
+        let encoded = encoder.consume();
         let mut decoder = PackedDecoder::new();
         decoder.set_data(Encoding::RLE, ByteBufferPtr::new(encoded));
 
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 6c467b5e4..ce773c19d 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -630,7 +630,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
                             Encoding::RLE,
                             &self.rep_levels_sink[..],
                             max_rep_level,
-                        )?[..],
+                        )[..],
                     );
                 }
 
@@ -640,7 +640,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
                             Encoding::RLE,
                             &self.def_levels_sink[..],
                             max_def_level,
-                        )?[..],
+                        )[..],
                     );
                 }
 
@@ -671,14 +671,14 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, 
E> {
 
                 if max_rep_level > 0 {
                     let levels =
-                        self.encode_levels_v2(&self.rep_levels_sink[..], 
max_rep_level)?;
+                        self.encode_levels_v2(&self.rep_levels_sink[..], 
max_rep_level);
                     rep_levels_byte_len = levels.len();
                     buffer.extend_from_slice(&levels[..]);
                 }
 
                 if max_def_level > 0 {
                     let levels =
-                        self.encode_levels_v2(&self.def_levels_sink[..], 
max_def_level)?;
+                        self.encode_levels_v2(&self.def_levels_sink[..], 
max_def_level);
                     def_levels_byte_len = levels.len();
                     buffer.extend_from_slice(&levels[..]);
                 }
@@ -794,18 +794,18 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, 
E> {
         encoding: Encoding,
         levels: &[i16],
         max_level: i16,
-    ) -> Result<Vec<u8>> {
+    ) -> Vec<u8> {
         let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len());
-        encoder.put(levels)?;
+        encoder.put(levels);
         encoder.consume()
     }
 
     /// Encodes definition or repetition levels for Data Page v2.
     /// Encoding is always RLE.
     #[inline]
-    fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> 
Result<Vec<u8>> {
+    fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Vec<u8> {
         let mut encoder = LevelEncoder::v2(max_level, levels.len());
-        encoder.put(levels)?;
+        encoder.put(levels);
         encoder.consume()
     }
 
diff --git a/parquet/src/encodings/encoding/dict_encoder.rs 
b/parquet/src/encodings/encoding/dict_encoder.rs
index 8f3c98aca..a7855cc84 100644
--- a/parquet/src/encodings/encoding/dict_encoder.rs
+++ b/parquet/src/encodings/encoding/dict_encoder.rs
@@ -23,7 +23,7 @@ use crate::data_type::private::ParquetValueType;
 use crate::data_type::DataType;
 use crate::encodings::encoding::{Encoder, PlainEncoder};
 use crate::encodings::rle::RleEncoder;
-use crate::errors::{ParquetError, Result};
+use crate::errors::Result;
 use crate::schema::types::ColumnDescPtr;
 use crate::util::bit_util::num_required_bits;
 use crate::util::interner::{Interner, Storage};
@@ -132,12 +132,10 @@ impl<T: DataType> DictEncoder<T> {
         // Write bit width in the first byte
         let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer);
         for index in &self.indices {
-            if !encoder.put(*index as u64)? {
-                return Err(general_err!("Encoder doesn't have enough space"));
-            }
+            encoder.put(*index as u64)
         }
         self.indices.clear();
-        Ok(ByteBufferPtr::new(encoder.consume()?))
+        Ok(ByteBufferPtr::new(encoder.consume()))
     }
 
     fn put_one(&mut self, value: &T::T) {
diff --git a/parquet/src/encodings/encoding/mod.rs 
b/parquet/src/encodings/encoding/mod.rs
index 383211f12..b0c8fa10f 100644
--- a/parquet/src/encodings/encoding/mod.rs
+++ b/parquet/src/encodings/encoding/mod.rs
@@ -195,9 +195,7 @@ impl<T: DataType> Encoder<T> for RleValueEncoder<T> {
 
         for value in values {
             let value = value.as_u64()?;
-            if !rle_encoder.put(value)? {
-                return Err(general_err!("RLE buffer is full"));
-            }
+            rle_encoder.put(value)
         }
         Ok(())
     }
@@ -227,7 +225,7 @@ impl<T: DataType> Encoder<T> for RleValueEncoder<T> {
             .expect("RLE value encoder is not initialized");
 
         // Flush all encoder buffers and raw values
-        let mut buf = rle_encoder.consume()?;
+        let mut buf = rle_encoder.consume();
         assert!(buf.len() > 4, "should have had padding inserted");
 
         // Note that buf does not have any offset, all data is encoded bytes
diff --git a/parquet/src/encodings/levels.rs b/parquet/src/encodings/levels.rs
index 8bdfdd3e9..62c68d843 100644
--- a/parquet/src/encodings/levels.rs
+++ b/parquet/src/encodings/levels.rs
@@ -21,7 +21,7 @@ use super::rle::{RleDecoder, RleEncoder};
 
 use crate::basic::Encoding;
 use crate::data_type::AsBytes;
-use crate::errors::{ParquetError, Result};
+use crate::errors::Result;
 use crate::util::{
     bit_util::{ceil, num_required_bits, BitReader, BitWriter},
     memory::ByteBufferPtr,
@@ -97,21 +97,16 @@ impl LevelEncoder {
     /// Put/encode levels vector into this level encoder.
     /// Returns number of encoded values that are less than or equal to length 
of the
     /// input buffer.
-    ///
-    /// RLE and BIT_PACKED level encoders return Err() when internal buffer 
overflows or
-    /// flush fails.
     #[inline]
-    pub fn put(&mut self, buffer: &[i16]) -> Result<usize> {
+    pub fn put(&mut self, buffer: &[i16]) -> usize {
         let mut num_encoded = 0;
         match *self {
             LevelEncoder::Rle(ref mut encoder) | LevelEncoder::RleV2(ref mut 
encoder) => {
                 for value in buffer {
-                    if !encoder.put(*value as u64)? {
-                        return Err(general_err!("RLE buffer is full"));
-                    }
+                    encoder.put(*value as u64);
                     num_encoded += 1;
                 }
-                encoder.flush()?;
+                encoder.flush();
             }
             LevelEncoder::BitPacked(bit_width, ref mut encoder) => {
                 for value in buffer {
@@ -121,25 +116,25 @@ impl LevelEncoder {
                 encoder.flush();
             }
         }
-        Ok(num_encoded)
+        num_encoded
     }
 
     /// Finalizes level encoder, flush all intermediate buffers and return 
resulting
     /// encoded buffer. Returned buffer is already truncated to encoded bytes 
only.
     #[inline]
-    pub fn consume(self) -> Result<Vec<u8>> {
+    pub fn consume(self) -> Vec<u8> {
         match self {
             LevelEncoder::Rle(encoder) => {
-                let mut encoded_data = encoder.consume()?;
+                let mut encoded_data = encoder.consume();
                 // Account for the buffer offset
                 let encoded_len = encoded_data.len() - mem::size_of::<i32>();
                 let len = (encoded_len as i32).to_le();
                 let len_bytes = len.as_bytes();
                 encoded_data[0..len_bytes.len()].copy_from_slice(len_bytes);
-                Ok(encoded_data)
+                encoded_data
             }
             LevelEncoder::RleV2(encoder) => encoder.consume(),
-            LevelEncoder::BitPacked(_, encoder) => Ok(encoder.consume()),
+            LevelEncoder::BitPacked(_, encoder) => encoder.consume(),
         }
     }
 }
@@ -287,8 +282,8 @@ mod tests {
         } else {
             LevelEncoder::v1(enc, max_level, levels.len())
         };
-        encoder.put(levels).expect("put() should be OK");
-        let encoded_levels = encoder.consume().expect("consume() should be 
OK");
+        encoder.put(levels);
+        let encoded_levels = encoder.consume();
 
         let byte_buf = ByteBufferPtr::new(encoded_levels);
         let mut decoder;
@@ -318,8 +313,8 @@ mod tests {
         } else {
             LevelEncoder::v1(enc, max_level, levels.len())
         };
-        encoder.put(levels).expect("put() should be OK");
-        let encoded_levels = encoder.consume().expect("consume() should be 
OK");
+        encoder.put(levels);
+        let encoded_levels = encoder.consume();
 
         let byte_buf = ByteBufferPtr::new(encoded_levels);
         let mut decoder;
@@ -366,8 +361,8 @@ mod tests {
             LevelEncoder::v1(enc, max_level, levels.len())
         };
         // Encode only one value
-        let num_encoded = encoder.put(&levels[0..1]).expect("put() should be 
OK");
-        let encoded_levels = encoder.consume().expect("consume() should be 
OK");
+        let num_encoded = encoder.put(&levels[0..1]);
+        let encoded_levels = encoder.consume();
         assert_eq!(num_encoded, 1);
 
         let byte_buf = ByteBufferPtr::new(encoded_levels);
diff --git a/parquet/src/encodings/rle.rs b/parquet/src/encodings/rle.rs
index 28ebd7d3a..aad833e0e 100644
--- a/parquet/src/encodings/rle.rs
+++ b/parquet/src/encodings/rle.rs
@@ -132,23 +132,21 @@ impl RleEncoder {
     }
 
     /// Encodes `value`, which must be representable with `bit_width` bits.
-    /// Returns true if the value fits in buffer, false if it doesn't, or
-    /// error if something is wrong.
     #[inline]
-    pub fn put(&mut self, value: u64) -> Result<bool> {
+    pub fn put(&mut self, value: u64) {
         // This function buffers 8 values at a time. After seeing 8 values, it
         // decides whether the current run should be encoded in bit-packed or 
RLE.
         if self.current_value == value {
             self.repeat_count += 1;
             if self.repeat_count > 8 {
                 // A continuation of last value. No need to buffer.
-                return Ok(true);
+                return;
             }
         } else {
             if self.repeat_count >= 8 {
                 // The current RLE run has ended and we've gathered enough. 
Flush first.
                 assert_eq!(self.bit_packed_count, 0);
-                self.flush_rle_run()?;
+                self.flush_rle_run();
             }
             self.repeat_count = 1;
             self.current_value = value;
@@ -159,10 +157,8 @@ impl RleEncoder {
         if self.num_buffered_values == 8 {
             // Buffered values are full. Flush them.
             assert_eq!(self.bit_packed_count % 8, 0);
-            self.flush_buffered_values()?;
+            self.flush_buffered_values();
         }
-
-        Ok(true)
     }
 
     #[inline]
@@ -180,17 +176,17 @@ impl RleEncoder {
     }
 
     #[inline]
-    pub fn consume(mut self) -> Result<Vec<u8>> {
-        self.flush()?;
-        Ok(self.bit_writer.consume())
+    pub fn consume(mut self) -> Vec<u8> {
+        self.flush();
+        self.bit_writer.consume()
     }
 
     /// Borrow equivalent of the `consume` method.
     /// Call `clear()` after invoking this method.
     #[inline]
-    pub fn flush_buffer(&mut self) -> Result<&[u8]> {
-        self.flush()?;
-        Ok(self.bit_writer.flush_buffer())
+    pub fn flush_buffer(&mut self) -> &[u8] {
+        self.flush();
+        self.bit_writer.flush_buffer()
     }
 
     /// Clears the internal state so this encoder can be reused (e.g., after 
becoming
@@ -208,7 +204,7 @@ impl RleEncoder {
     /// Flushes all remaining values and return the final byte buffer 
maintained by the
     /// internal writer.
     #[inline]
-    pub fn flush(&mut self) -> Result<()> {
+    pub fn flush(&mut self) {
         if self.bit_packed_count > 0
             || self.repeat_count > 0
             || self.num_buffered_values > 0
@@ -217,7 +213,7 @@ impl RleEncoder {
                 && (self.repeat_count == self.num_buffered_values
                     || self.num_buffered_values == 0);
             if self.repeat_count > 0 && all_repeat {
-                self.flush_rle_run()?;
+                self.flush_rle_run();
             } else {
                 // Buffer the last group of bit-packed values to 8 by padding 
with 0s.
                 if self.num_buffered_values > 0 {
@@ -227,14 +223,13 @@ impl RleEncoder {
                     }
                 }
                 self.bit_packed_count += self.num_buffered_values;
-                self.flush_bit_packed_run(true)?;
+                self.flush_bit_packed_run(true);
                 self.repeat_count = 0;
             }
         }
-        Ok(())
     }
 
-    fn flush_rle_run(&mut self) -> Result<()> {
+    fn flush_rle_run(&mut self) {
         assert!(self.repeat_count > 0);
         let indicator_value = self.repeat_count << 1;
         self.bit_writer.put_vlq_int(indicator_value as u64);
@@ -244,10 +239,9 @@ impl RleEncoder {
         );
         self.num_buffered_values = 0;
         self.repeat_count = 0;
-        Ok(())
     }
 
-    fn flush_bit_packed_run(&mut self, update_indicator_byte: bool) -> 
Result<()> {
+    fn flush_bit_packed_run(&mut self, update_indicator_byte: bool) {
         if self.indicator_byte_pos < 0 {
             self.indicator_byte_pos = self.bit_writer.skip(1) as i64;
         }
@@ -270,20 +264,19 @@ impl RleEncoder {
             self.indicator_byte_pos = -1;
             self.bit_packed_count = 0;
         }
-        Ok(())
     }
 
     #[inline(never)]
-    fn flush_buffered_values(&mut self) -> Result<()> {
+    fn flush_buffered_values(&mut self) {
         if self.repeat_count >= 8 {
             self.num_buffered_values = 0;
             if self.bit_packed_count > 0 {
                 // In this case we choose RLE encoding. Flush the current 
buffered values
                 // as bit-packed encoding.
                 assert_eq!(self.bit_packed_count % 8, 0);
-                self.flush_bit_packed_run(true)?
+                self.flush_bit_packed_run(true)
             }
-            return Ok(());
+            return;
         }
 
         self.bit_packed_count += self.num_buffered_values;
@@ -292,12 +285,11 @@ impl RleEncoder {
             // We've reached the maximum value that can be hold in a single 
bit-packed
             // run.
             assert!(self.indicator_byte_pos >= 0);
-            self.flush_bit_packed_run(true)?;
+            self.flush_bit_packed_run(true);
         } else {
-            self.flush_bit_packed_run(false)?;
+            self.flush_bit_packed_run(false);
         }
         self.repeat_count = 0;
-        Ok(())
     }
 }
 
@@ -585,11 +577,11 @@ mod tests {
         let mut encoder1 = RleEncoder::new(3, 256);
         let mut encoder2 = RleEncoder::new(3, 256);
         for value in data {
-            encoder1.put(value as u64).unwrap();
-            encoder2.put(value as u64).unwrap();
+            encoder1.put(value as u64);
+            encoder2.put(value as u64);
         }
-        let res1 = encoder1.flush_buffer().unwrap();
-        let res2 = encoder2.consume().unwrap();
+        let res1 = encoder1.flush_buffer();
+        let res2 = encoder2.consume();
         assert_eq!(res1, &res2[..]);
     }
 
@@ -763,10 +755,9 @@ mod tests {
         let buffer_len = 64 * 1024;
         let mut encoder = RleEncoder::new(bit_width, buffer_len);
         for v in values {
-            let result = encoder.put(*v as u64);
-            assert!(result.is_ok());
+            encoder.put(*v as u64)
         }
-        let buffer = ByteBufferPtr::new(encoder.consume().expect("Expect 
consume() OK"));
+        let buffer = ByteBufferPtr::new(encoder.consume());
         if expected_len != -1 {
             assert_eq!(buffer.len(), expected_len as usize);
         }
@@ -919,9 +910,9 @@ mod tests {
         let values: Vec<i16> = vec![0, 1, 1, 1, 1, 0, 0, 0, 0, 1];
         let mut encoder = RleEncoder::new(bit_width, buffer_len);
         for v in &values {
-            assert!(encoder.put(*v as u64).expect("put() should be OK"));
+            encoder.put(*v as u64)
         }
-        let buffer = encoder.consume().expect("consume() should be OK");
+        let buffer = encoder.consume();
         let mut decoder = RleDecoder::new(bit_width);
         decoder.set_data(ByteBufferPtr::new(buffer));
         let mut actual_values: Vec<i16> = vec![0; values.len()];
@@ -935,12 +926,10 @@ mod tests {
         let buffer_len = 64 * 1024;
         let mut encoder = RleEncoder::new(bit_width, buffer_len);
         for v in values {
-            let result = encoder.put(*v as u64).expect("put() should be OK");
-            assert!(result, "put() should not return false");
+            encoder.put(*v as u64)
         }
 
-        let buffer =
-            ByteBufferPtr::new(encoder.consume().expect("consume() should be 
OK"));
+        let buffer = ByteBufferPtr::new(encoder.consume());
 
         // Verify read
         let mut decoder = RleDecoder::new(bit_width);
diff --git a/parquet/src/util/test_common/page_util.rs 
b/parquet/src/util/test_common/page_util.rs
index d7653d4e5..bc197d00e 100644
--- a/parquet/src/util/test_common/page_util.rs
+++ b/parquet/src/util/test_common/page_util.rs
@@ -75,8 +75,8 @@ impl DataPageBuilderImpl {
             return 0;
         }
         let mut level_encoder = LevelEncoder::v1(Encoding::RLE, max_level, 
levels.len());
-        level_encoder.put(levels).expect("put() should be OK");
-        let encoded_levels = level_encoder.consume().expect("consume() should 
be OK");
+        level_encoder.put(levels);
+        let encoded_levels = level_encoder.consume();
         // Actual encoded bytes (without length offset)
         let encoded_bytes = &encoded_levels[mem::size_of::<i32>()..];
         if self.datapage_v2 {

Reply via email to