This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 7e54bb2238 [Parquet] Return error from `RleDecoder::reload` rather
than panic (#8729)
7e54bb2238 is described below
commit 7e54bb22389bf9682efb326a573e30380cdca0d8
Author: Liam Bao <[email protected]>
AuthorDate: Fri Oct 31 17:12:47 2025 -0400
[Parquet] Return error from `RleDecoder::reload` rather than panic (#8729)
# Which issue does this PR close?
- Closes #8632.
# Rationale for this change
# What changes are included in this PR?
- Updated `RleDecoder::reload` to return Result instead of panicking.
- Adjusted all callers to handle the new return type accordingly.
# Are these changes tested?
Covered by existing tests
# Are there any user-facing changes?
No
---
parquet/src/arrow/array_reader/byte_array.rs | 10 +--
.../arrow/array_reader/byte_array_dictionary.rs | 2 +-
parquet/src/arrow/array_reader/byte_view_array.rs | 10 +--
.../src/arrow/array_reader/fixed_len_byte_array.rs | 2 +-
parquet/src/arrow/decoder/dictionary_index.rs | 8 +-
.../src/arrow/record_reader/definition_levels.rs | 7 +-
parquet/src/column/reader.rs | 8 +-
parquet/src/column/reader/decoder.rs | 26 +++---
parquet/src/encodings/decoding.rs | 4 +-
parquet/src/encodings/rle.rs | 98 +++++++++++++---------
10 files changed, 98 insertions(+), 77 deletions(-)
diff --git a/parquet/src/arrow/array_reader/byte_array.rs
b/parquet/src/arrow/array_reader/byte_array.rs
index 5a495d2c60..0acbe65019 100644
--- a/parquet/src/arrow/array_reader/byte_array.rs
+++ b/parquet/src/arrow/array_reader/byte_array.rs
@@ -274,7 +274,7 @@ impl ByteArrayDecoder {
validate_utf8,
)),
Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY =>
ByteArrayDecoder::Dictionary(
- ByteArrayDecoderDictionary::new(data, num_levels, num_values),
+ ByteArrayDecoderDictionary::new(data, num_levels, num_values)?,
),
Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteArrayDecoder::DeltaLength(
ByteArrayDecoderDeltaLength::new(data, validate_utf8)?,
@@ -563,10 +563,10 @@ pub struct ByteArrayDecoderDictionary {
}
impl ByteArrayDecoderDictionary {
- fn new(data: Bytes, num_levels: usize, num_values: Option<usize>) -> Self {
- Self {
- decoder: DictIndexDecoder::new(data, num_levels, num_values),
- }
+ fn new(data: Bytes, num_levels: usize, num_values: Option<usize>) ->
Result<Self> {
+ Ok(Self {
+ decoder: DictIndexDecoder::new(data, num_levels, num_values)?,
+ })
}
fn read<I: OffsetSizeTrait>(
diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs
b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
index 4afe4264cb..09de37a80e 100644
--- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs
+++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
@@ -293,7 +293,7 @@ where
Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
let bit_width = data[0];
let mut decoder = RleDecoder::new(bit_width);
- decoder.set_data(data.slice(1..));
+ decoder.set_data(data.slice(1..))?;
MaybeDictionaryDecoder::Dict {
decoder,
max_remaining_values: num_values.unwrap_or(num_levels),
diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs
b/parquet/src/arrow/array_reader/byte_view_array.rs
index cc71647f4b..f881690f80 100644
--- a/parquet/src/arrow/array_reader/byte_view_array.rs
+++ b/parquet/src/arrow/array_reader/byte_view_array.rs
@@ -236,7 +236,7 @@ impl ByteViewArrayDecoder {
Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
ByteViewArrayDecoder::Dictionary(ByteViewArrayDecoderDictionary::new(
data, num_levels, num_values,
- ))
+ )?)
}
Encoding::DELTA_LENGTH_BYTE_ARRAY =>
ByteViewArrayDecoder::DeltaLength(
ByteViewArrayDecoderDeltaLength::new(data, validate_utf8)?,
@@ -426,10 +426,10 @@ pub struct ByteViewArrayDecoderDictionary {
}
impl ByteViewArrayDecoderDictionary {
- fn new(data: Bytes, num_levels: usize, num_values: Option<usize>) -> Self {
- Self {
- decoder: DictIndexDecoder::new(data, num_levels, num_values),
- }
+ fn new(data: Bytes, num_levels: usize, num_values: Option<usize>) ->
Result<Self> {
+ Ok(Self {
+ decoder: DictIndexDecoder::new(data, num_levels, num_values)?,
+ })
}
/// Reads the next indexes from self.decoder
diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
index a37bef568d..2297926add 100644
--- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
+++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
@@ -381,7 +381,7 @@ impl ColumnValueDecoder for ValueDecoder {
offset: 0,
},
Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY =>
Decoder::Dict {
- decoder: DictIndexDecoder::new(data, num_levels, num_values),
+ decoder: DictIndexDecoder::new(data, num_levels, num_values)?,
},
Encoding::DELTA_BYTE_ARRAY => Decoder::Delta {
decoder: DeltaByteArrayDecoder::new(data)?,
diff --git a/parquet/src/arrow/decoder/dictionary_index.rs
b/parquet/src/arrow/decoder/dictionary_index.rs
index 38f2b05836..bb96f4bf98 100644
--- a/parquet/src/arrow/decoder/dictionary_index.rs
+++ b/parquet/src/arrow/decoder/dictionary_index.rs
@@ -42,18 +42,18 @@ pub struct DictIndexDecoder {
impl DictIndexDecoder {
/// Create a new [`DictIndexDecoder`] with the provided data page, the
number of levels
/// associated with this data page, and the number of non-null values (if
known)
- pub fn new(data: Bytes, num_levels: usize, num_values: Option<usize>) ->
Self {
+ pub fn new(data: Bytes, num_levels: usize, num_values: Option<usize>) ->
Result<Self> {
let bit_width = data[0];
let mut decoder = RleDecoder::new(bit_width);
- decoder.set_data(data.slice(1..));
+ decoder.set_data(data.slice(1..))?;
- Self {
+ Ok(Self {
decoder,
index_buf: Box::new([0; 1024]),
index_buf_len: 0,
index_offset: 0,
max_remaining_values: num_values.unwrap_or(num_levels),
- }
+ })
}
/// Read up to `len` values, returning the number of values read
diff --git a/parquet/src/arrow/record_reader/definition_levels.rs
b/parquet/src/arrow/record_reader/definition_levels.rs
index 34b728d6fa..8fe26a9b52 100644
--- a/parquet/src/arrow/record_reader/definition_levels.rs
+++ b/parquet/src/arrow/record_reader/definition_levels.rs
@@ -131,11 +131,12 @@ impl DefinitionLevelBufferDecoder {
impl ColumnLevelDecoder for DefinitionLevelBufferDecoder {
type Buffer = DefinitionLevelBuffer;
- fn set_data(&mut self, encoding: Encoding, data: Bytes) {
+ fn set_data(&mut self, encoding: Encoding, data: Bytes) -> Result<()> {
match &mut self.decoder {
MaybePacked::Packed(d) => d.set_data(encoding, data),
- MaybePacked::Fallback(d) => d.set_data(encoding, data),
- }
+ MaybePacked::Fallback(d) => d.set_data(encoding, data)?,
+ };
+ Ok(())
}
}
diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs
index ebde79e6a7..387a0602a6 100644
--- a/parquet/src/column/reader.rs
+++ b/parquet/src/column/reader.rs
@@ -451,7 +451,7 @@ where
self.rep_level_decoder
.as_mut()
.unwrap()
- .set_data(rep_level_encoding, level_data);
+ .set_data(rep_level_encoding, level_data)?;
}
if max_def_level > 0 {
@@ -466,7 +466,7 @@ where
self.def_level_decoder
.as_mut()
.unwrap()
- .set_data(def_level_encoding, level_data);
+ .set_data(def_level_encoding, level_data)?;
}
self.values_decoder.set_data(
@@ -512,7 +512,7 @@ where
self.rep_level_decoder.as_mut().unwrap().set_data(
Encoding::RLE,
buf.slice(..rep_levels_byte_len as usize),
- );
+ )?;
}
// DataPage v2 only supports RLE encoding for
definition
@@ -524,7 +524,7 @@ where
rep_levels_byte_len as usize
..(rep_levels_byte_len +
def_levels_byte_len) as usize,
),
- );
+ )?;
}
self.values_decoder.set_data(
diff --git a/parquet/src/column/reader/decoder.rs
b/parquet/src/column/reader/decoder.rs
index e499062075..053db813ce 100644
--- a/parquet/src/column/reader/decoder.rs
+++ b/parquet/src/column/reader/decoder.rs
@@ -32,7 +32,7 @@ pub trait ColumnLevelDecoder {
type Buffer;
/// Set data for this [`ColumnLevelDecoder`]
- fn set_data(&mut self, encoding: Encoding, data: Bytes);
+ fn set_data(&mut self, encoding: Encoding, data: Bytes) -> Result<()>;
}
pub trait RepetitionLevelDecoder: ColumnLevelDecoder {
@@ -266,15 +266,15 @@ enum LevelDecoder {
}
impl LevelDecoder {
- fn new(encoding: Encoding, data: Bytes, bit_width: u8) -> Self {
+ fn new(encoding: Encoding, data: Bytes, bit_width: u8) -> Result<Self> {
match encoding {
Encoding::RLE => {
let mut decoder = RleDecoder::new(bit_width);
- decoder.set_data(data);
- Self::Rle(decoder)
+ decoder.set_data(data)?;
+ Ok(Self::Rle(decoder))
}
#[allow(deprecated)]
- Encoding::BIT_PACKED => Self::Packed(BitReader::new(data),
bit_width),
+ Encoding::BIT_PACKED => Ok(Self::Packed(BitReader::new(data),
bit_width)),
_ => unreachable!("invalid level encoding: {}", encoding),
}
}
@@ -310,8 +310,9 @@ impl DefinitionLevelDecoderImpl {
impl ColumnLevelDecoder for DefinitionLevelDecoderImpl {
type Buffer = Vec<i16>;
- fn set_data(&mut self, encoding: Encoding, data: Bytes) {
- self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width))
+ fn set_data(&mut self, encoding: Encoding, data: Bytes) -> Result<()> {
+ self.decoder = Some(LevelDecoder::new(encoding, data,
self.bit_width)?);
+ Ok(())
}
}
@@ -413,10 +414,11 @@ impl RepetitionLevelDecoderImpl {
impl ColumnLevelDecoder for RepetitionLevelDecoderImpl {
type Buffer = Vec<i16>;
- fn set_data(&mut self, encoding: Encoding, data: Bytes) {
- self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width));
+ fn set_data(&mut self, encoding: Encoding, data: Bytes) -> Result<()> {
+ self.decoder = Some(LevelDecoder::new(encoding, data,
self.bit_width)?);
self.buffer_len = 0;
self.buffer_offset = 0;
+ Ok(())
}
}
@@ -499,14 +501,14 @@ mod tests {
let data = Bytes::from(encoder.consume());
let mut decoder = RepetitionLevelDecoderImpl::new(1);
- decoder.set_data(Encoding::RLE, data.clone());
+ decoder.set_data(Encoding::RLE, data.clone()).unwrap();
let (_, levels) = decoder.skip_rep_levels(100, 4).unwrap();
assert_eq!(levels, 4);
// The length of the final bit packed run is ambiguous, so without the
correct
// levels limit, it will decode zero padding
let mut decoder = RepetitionLevelDecoderImpl::new(1);
- decoder.set_data(Encoding::RLE, data);
+ decoder.set_data(Encoding::RLE, data).unwrap();
let (_, levels) = decoder.skip_rep_levels(100, 6).unwrap();
assert_eq!(levels, 6);
}
@@ -525,7 +527,7 @@ mod tests {
let data = Bytes::from(encoder.consume());
let mut decoder = RepetitionLevelDecoderImpl::new(5);
- decoder.set_data(Encoding::RLE, data);
+ decoder.set_data(Encoding::RLE, data).unwrap();
let total_records = encoded.iter().filter(|x| **x == 0).count();
let mut remaining_records = total_records;
diff --git a/parquet/src/encodings/decoding.rs
b/parquet/src/encodings/decoding.rs
index f5336ca7c0..8201b38753 100644
--- a/parquet/src/encodings/decoding.rs
+++ b/parquet/src/encodings/decoding.rs
@@ -393,7 +393,7 @@ impl<T: DataType> Decoder<T> for DictDecoder<T> {
));
}
let mut rle_decoder = RleDecoder::new(bit_width);
- rle_decoder.set_data(data.slice(1..));
+ rle_decoder.set_data(data.slice(1..))?;
self.num_values = num_values;
self.rle_decoder = Some(rle_decoder);
Ok(())
@@ -473,7 +473,7 @@ impl<T: DataType> Decoder<T> for RleValueDecoder<T> {
self.decoder = RleDecoder::new(1);
self.decoder
- .set_data(data.slice(I32_SIZE..I32_SIZE + data_size));
+ .set_data(data.slice(I32_SIZE..I32_SIZE + data_size))?;
self.values_left = num_values;
Ok(())
}
diff --git a/parquet/src/encodings/rle.rs b/parquet/src/encodings/rle.rs
index 41c0501320..c95a46c634 100644
--- a/parquet/src/encodings/rle.rs
+++ b/parquet/src/encodings/rle.rs
@@ -321,14 +321,18 @@ impl RleDecoder {
}
#[inline]
- pub fn set_data(&mut self, data: Bytes) {
+ pub fn set_data(&mut self, data: Bytes) -> Result<()> {
if let Some(ref mut bit_reader) = self.bit_reader {
bit_reader.reset(data);
} else {
self.bit_reader = Some(BitReader::new(data));
}
- let _ = self.reload();
+ // Initialize decoder state. The boolean only reports whether the
first run contained data,
+ // and `get`/`get_batch` already interpret that result to drive
iteration. We only need
+ // errors propagated here, so the flag returned is intentionally
ignored.
+ let _ = self.reload()?;
+ Ok(())
}
// These functions inline badly, they tend to inline and then create very
large loop unrolls
@@ -339,7 +343,7 @@ impl RleDecoder {
assert!(size_of::<T>() <= 8);
while self.rle_left == 0 && self.bit_packed_left == 0 {
- if !self.reload() {
+ if !self.reload()? {
return Ok(None);
}
}
@@ -349,14 +353,17 @@ impl RleDecoder {
&self
.current_value
.as_mut()
- .expect("current_value should be Some")
+ .ok_or_else(|| general_err!("current_value should be
Some"))?
.to_ne_bytes(),
)?;
self.rle_left -= 1;
rle_value
} else {
// self.bit_packed_left > 0
- let bit_reader = self.bit_reader.as_mut().expect("bit_reader
should be Some");
+ let bit_reader = self
+ .bit_reader
+ .as_mut()
+ .ok_or_else(|| general_err!("bit_reader should be Some"))?;
let bit_packed_value = bit_reader
.get_value(self.bit_width as usize)
.ok_or_else(|| eof_err!("Not enough data for
'bit_packed_value'"))?;
@@ -383,7 +390,10 @@ impl RleDecoder {
} else if self.bit_packed_left > 0 {
let mut num_values =
cmp::min(buffer.len() - values_read, self.bit_packed_left
as usize);
- let bit_reader = self.bit_reader.as_mut().expect("bit_reader
should be set");
+ let bit_reader = self
+ .bit_reader
+ .as_mut()
+ .ok_or_else(|| ParquetError::General("bit_reader should be
set".into()))?;
num_values = bit_reader.get_batch::<T>(
&mut buffer[values_read..values_read + num_values],
@@ -396,7 +406,7 @@ impl RleDecoder {
}
self.bit_packed_left -= num_values as u32;
values_read += num_values;
- } else if !self.reload() {
+ } else if !self.reload()? {
break;
}
}
@@ -415,7 +425,10 @@ impl RleDecoder {
} else if self.bit_packed_left > 0 {
let mut num_values =
cmp::min(num_values - values_skipped, self.bit_packed_left
as usize);
- let bit_reader = self.bit_reader.as_mut().expect("bit_reader
should be set");
+ let bit_reader = self
+ .bit_reader
+ .as_mut()
+ .ok_or_else(|| general_err!("bit_reader should be set"))?;
num_values = bit_reader.skip(num_values, self.bit_width as
usize);
if num_values == 0 {
@@ -425,7 +438,7 @@ impl RleDecoder {
}
self.bit_packed_left -= num_values as u32;
values_skipped += num_values;
- } else if !self.reload() {
+ } else if !self.reload()? {
break;
}
}
@@ -459,7 +472,10 @@ impl RleDecoder {
self.rle_left -= num_values as u32;
values_read += num_values;
} else if self.bit_packed_left > 0 {
- let bit_reader = self.bit_reader.as_mut().expect("bit_reader
should be set");
+ let bit_reader = self
+ .bit_reader
+ .as_mut()
+ .ok_or_else(|| general_err!("bit_reader should be set"))?;
loop {
let to_read = index_buf
@@ -488,7 +504,7 @@ impl RleDecoder {
break;
}
}
- } else if !self.reload() {
+ } else if !self.reload()? {
break;
}
}
@@ -497,15 +513,18 @@ impl RleDecoder {
}
#[inline]
- fn reload(&mut self) -> bool {
- let bit_reader = self.bit_reader.as_mut().expect("bit_reader should be
set");
+ fn reload(&mut self) -> Result<bool> {
+ let bit_reader = self
+ .bit_reader
+ .as_mut()
+ .ok_or_else(|| general_err!("bit_reader should be set"))?;
if let Some(indicator_value) = bit_reader.get_vlq_int() {
// fastparquet adds padding to the end of pages. This is not
spec-compliant
// but is handled by the C++ implementation
//
<https://github.com/apache/arrow/blob/8074496cb41bc8ec8fe9fc814ca5576d89a6eb94/cpp/src/arrow/util/rle_encoding.h#L653>
if indicator_value == 0 {
- return false;
+ return Ok(false);
}
if indicator_value & 1 == 1 {
self.bit_packed_left = ((indicator_value >> 1) * 8) as u32;
@@ -513,14 +532,13 @@ impl RleDecoder {
self.rle_left = (indicator_value >> 1) as u32;
let value_width = bit_util::ceil(self.bit_width as usize, 8);
self.current_value =
bit_reader.get_aligned::<u64>(value_width);
- assert!(
- self.current_value.is_some(),
- "parquet_data_error: not enough data for RLE decoding"
- );
+ self.current_value.ok_or_else(|| {
+ general_err!("parquet_data_error: not enough data for RLE
decoding")
+ })?;
}
- true
+ Ok(true)
} else {
- false
+ Ok(false)
}
}
}
@@ -540,7 +558,7 @@ mod tests {
// 00000011 10001000 11000110 11111010
let data = vec![0x03, 0x88, 0xC6, 0xFA];
let mut decoder: RleDecoder = RleDecoder::new(3);
- decoder.set_data(data.into());
+ decoder.set_data(data.into()).unwrap();
let mut buffer = vec![0; 8];
let expected = vec![0, 1, 2, 3, 4, 5, 6, 7];
let result = decoder.get_batch::<i32>(&mut buffer);
@@ -554,7 +572,7 @@ mod tests {
// 00000011 10001000 11000110 11111010
let data = vec![0x03, 0x88, 0xC6, 0xFA];
let mut decoder: RleDecoder = RleDecoder::new(3);
- decoder.set_data(data.into());
+ decoder.set_data(data.into()).unwrap();
let expected = vec![2, 3, 4, 5, 6, 7];
let skipped = decoder.skip(2).expect("skipping values");
assert_eq!(skipped, 2);
@@ -595,7 +613,7 @@ mod tests {
];
let mut decoder: RleDecoder = RleDecoder::new(1);
- decoder.set_data(data1.into());
+ decoder.set_data(data1.into()).unwrap();
let mut buffer = vec![false; 100];
let mut expected = vec![];
for i in 0..100 {
@@ -609,7 +627,7 @@ mod tests {
assert!(result.is_ok());
assert_eq!(buffer, expected);
- decoder.set_data(data2.into());
+ decoder.set_data(data2.into()).unwrap();
let mut buffer = vec![false; 100];
let mut expected = vec![];
for i in 0..100 {
@@ -638,7 +656,7 @@ mod tests {
];
let mut decoder: RleDecoder = RleDecoder::new(1);
- decoder.set_data(data1.into());
+ decoder.set_data(data1.into()).unwrap();
let mut buffer = vec![true; 50];
let expected = vec![false; 50];
@@ -650,7 +668,7 @@ mod tests {
assert_eq!(remainder, 50);
assert_eq!(buffer, expected);
- decoder.set_data(data2.into());
+ decoder.set_data(data2.into()).unwrap();
let mut buffer = vec![false; 50];
let mut expected = vec![];
for i in 0..50 {
@@ -676,7 +694,7 @@ mod tests {
let dict = vec![10, 20, 30];
let data = vec![0x06, 0x00, 0x08, 0x01, 0x0A, 0x02];
let mut decoder: RleDecoder = RleDecoder::new(3);
- decoder.set_data(data.into());
+ decoder.set_data(data.into()).unwrap();
let mut buffer = vec![0; 12];
let expected = vec![10, 10, 10, 20, 20, 20, 20, 30, 30, 30, 30, 30];
let result = decoder.get_batch_with_dict::<i32>(&dict, &mut buffer,
12);
@@ -689,7 +707,7 @@ mod tests {
let dict = vec!["aaa", "bbb", "ccc", "ddd", "eee", "fff"];
let data = vec![0x03, 0x63, 0xC7, 0x8E, 0x03, 0x65, 0x0B];
let mut decoder: RleDecoder = RleDecoder::new(3);
- decoder.set_data(data.into());
+ decoder.set_data(data.into()).unwrap();
let mut buffer = vec![""; 12];
let expected = vec![
"ddd", "eee", "fff", "ddd", "eee", "fff", "ddd", "eee", "fff",
"eee", "fff", "fff",
@@ -707,7 +725,7 @@ mod tests {
let dict = vec![10, 20, 30];
let data = vec![0x06, 0x00, 0x08, 0x01, 0x0A, 0x02];
let mut decoder: RleDecoder = RleDecoder::new(3);
- decoder.set_data(data.into());
+ decoder.set_data(data.into()).unwrap();
let mut buffer = vec![0; 10];
let expected = vec![10, 20, 20, 20, 20, 30, 30, 30, 30, 30];
let skipped = decoder.skip(2).expect("skipping two values");
@@ -724,7 +742,7 @@ mod tests {
let dict = vec!["aaa", "bbb", "ccc", "ddd", "eee", "fff"];
let data = vec![0x03, 0x63, 0xC7, 0x8E, 0x03, 0x65, 0x0B];
let mut decoder: RleDecoder = RleDecoder::new(3);
- decoder.set_data(data.into());
+ decoder.set_data(data.into()).unwrap();
let mut buffer = vec![""; 8];
let expected = vec!["eee", "fff", "ddd", "eee", "fff", "eee", "fff",
"fff"];
let skipped = decoder.skip(4).expect("skipping four values");
@@ -757,7 +775,7 @@ mod tests {
// Verify read
let mut decoder = RleDecoder::new(bit_width);
- decoder.set_data(buffer.clone());
+ decoder.set_data(buffer.clone()).unwrap();
for v in values {
let val: i64 = decoder
.get()
@@ -767,7 +785,7 @@ mod tests {
}
// Verify batch read
- decoder.set_data(buffer);
+ decoder.set_data(buffer).unwrap();
let mut values_read: Vec<i64> = vec![0; values.len()];
decoder
.get_batch(&mut values_read[..])
@@ -872,7 +890,7 @@ mod tests {
let data: Bytes = data.into();
let mut decoder = RleDecoder::new(8);
- decoder.set_data(data.clone());
+ decoder.set_data(data.clone()).unwrap();
let mut output = vec![0_u16; 100];
let read = decoder.get_batch(&mut output).unwrap();
@@ -881,7 +899,7 @@ mod tests {
assert!(output.iter().take(20).all(|x| *x == 255));
// Reset decoder
- decoder.set_data(data);
+ decoder.set_data(data).unwrap();
let dict: Vec<u16> = (0..256).collect();
let mut output = vec![0_u16; 100];
@@ -907,7 +925,7 @@ mod tests {
buffer.push(0);
let mut decoder = RleDecoder::new(bit_width);
- decoder.set_data(buffer.into());
+ decoder.set_data(buffer.into()).unwrap();
// We don't always reliably know how many non-null values are
contained in a page
// and so the decoder must work correctly without a precise value count
@@ -947,14 +965,14 @@ mod tests {
let buffer: Bytes = writer.consume().into();
let mut decoder = RleDecoder::new(1);
- decoder.set_data(buffer.clone());
+ decoder.set_data(buffer.clone()).unwrap();
let mut decoded: Vec<i16> = vec![0; num_values];
let r = decoder.get_batch(&mut decoded).unwrap();
assert_eq!(r, num_values);
assert_eq!(vec![1; num_values], decoded);
- decoder.set_data(buffer);
+ decoder.set_data(buffer).unwrap();
let r = decoder
.get_batch_with_dict(&[0, 23], &mut decoded, num_values)
.unwrap();
@@ -973,7 +991,7 @@ mod tests {
}
let buffer = encoder.consume();
let mut decoder = RleDecoder::new(bit_width);
- decoder.set_data(Bytes::from(buffer));
+ decoder.set_data(Bytes::from(buffer)).unwrap();
let mut actual_values: Vec<i16> = vec![0; values.len()];
decoder
.get_batch(&mut actual_values)
@@ -992,7 +1010,7 @@ mod tests {
// Verify read
let mut decoder = RleDecoder::new(bit_width);
- decoder.set_data(buffer.clone());
+ decoder.set_data(buffer.clone()).unwrap();
for v in values {
let val = decoder
.get::<i32>()
@@ -1003,7 +1021,7 @@ mod tests {
// Verify batch read
let mut decoder = RleDecoder::new(bit_width);
- decoder.set_data(buffer);
+ decoder.set_data(buffer).unwrap();
let mut values_read: Vec<i32> = vec![0; values.len()];
decoder
.get_batch(&mut values_read[..])