This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 9a1e8b572d Use Vec in ColumnReader (#5177) (#5193)
9a1e8b572d is described below
commit 9a1e8b572d11078e813fffe3d5c7106b6953d58c
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Fri Dec 15 18:25:40 2023 +0000
Use Vec in ColumnReader (#5177) (#5193)
* Use Vec in ColumnReader (#5177)
* Update parquet_derive
---
parquet/src/arrow/array_reader/byte_array.rs | 34 ++---
.../arrow/array_reader/byte_array_dictionary.rs | 45 +++---
.../src/arrow/array_reader/fixed_len_byte_array.rs | 81 ++++------
parquet/src/arrow/array_reader/mod.rs | 4 +-
parquet/src/arrow/array_reader/null_array.rs | 10 +-
parquet/src/arrow/array_reader/primitive_array.rs | 8 +-
parquet/src/arrow/buffer/dictionary_buffer.rs | 45 +-----
parquet/src/arrow/buffer/offset_buffer.rs | 28 +---
parquet/src/arrow/record_reader/buffer.rs | 67 +-------
.../src/arrow/record_reader/definition_levels.rs | 57 +++----
parquet/src/arrow/record_reader/mod.rs | 72 +++------
parquet/src/column/reader.rs | 170 +++++----------------
parquet/src/column/reader/decoder.rs | 138 +++++++----------
parquet/src/column/writer/mod.rs | 43 ++----
parquet/src/file/serialized_reader.rs | 6 +-
parquet/src/file/writer.rs | 4 +-
parquet/src/record/triplet.rs | 17 ++-
parquet_derive/src/parquet_field.rs | 48 ++----
18 files changed, 266 insertions(+), 611 deletions(-)
diff --git a/parquet/src/arrow/array_reader/byte_array.rs
b/parquet/src/arrow/array_reader/byte_array.rs
index debe0d6109..19086878c1 100644
--- a/parquet/src/arrow/array_reader/byte_array.rs
+++ b/parquet/src/arrow/array_reader/byte_array.rs
@@ -31,11 +31,10 @@ use crate::schema::types::ColumnDescPtr;
use arrow_array::{
Array, ArrayRef, BinaryArray, Decimal128Array, Decimal256Array,
OffsetSizeTrait,
};
-use arrow_buffer::{i256, Buffer};
+use arrow_buffer::i256;
use arrow_schema::DataType as ArrowType;
use bytes::Bytes;
use std::any::Any;
-use std::ops::Range;
use std::sync::Arc;
/// Returns an [`ArrayReader`] that decodes the provided byte array column
@@ -79,8 +78,8 @@ pub fn make_byte_array_reader(
struct ByteArrayReader<I: OffsetSizeTrait> {
data_type: ArrowType,
pages: Box<dyn PageIterator>,
- def_levels_buffer: Option<Buffer>,
- rep_levels_buffer: Option<Buffer>,
+ def_levels_buffer: Option<Vec<i16>>,
+ rep_levels_buffer: Option<Vec<i16>>,
record_reader: GenericRecordReader<OffsetBuffer<I>,
ByteArrayColumnValueDecoder<I>>,
}
@@ -154,11 +153,11 @@ impl<I: OffsetSizeTrait> ArrayReader for
ByteArrayReader<I> {
}
fn get_def_levels(&self) -> Option<&[i16]> {
- self.def_levels_buffer.as_ref().map(|buf| buf.typed_data())
+ self.def_levels_buffer.as_deref()
}
fn get_rep_levels(&self) -> Option<&[i16]> {
- self.rep_levels_buffer.as_ref().map(|buf| buf.typed_data())
+ self.rep_levels_buffer.as_deref()
}
}
@@ -170,7 +169,7 @@ struct ByteArrayColumnValueDecoder<I: OffsetSizeTrait> {
}
impl<I: OffsetSizeTrait> ColumnValueDecoder for ByteArrayColumnValueDecoder<I>
{
- type Slice = OffsetBuffer<I>;
+ type Buffer = OffsetBuffer<I>;
fn new(desc: &ColumnDescPtr) -> Self {
let validate_utf8 = desc.converted_type() == ConvertedType::UTF8;
@@ -227,13 +226,13 @@ impl<I: OffsetSizeTrait> ColumnValueDecoder for
ByteArrayColumnValueDecoder<I> {
Ok(())
}
- fn read(&mut self, out: &mut Self::Slice, range: Range<usize>) ->
Result<usize> {
+ fn read(&mut self, out: &mut Self::Buffer, num_values: usize) ->
Result<usize> {
let decoder = self
.decoder
.as_mut()
.ok_or_else(|| general_err!("no decoder set"))?;
- decoder.read(out, range.end - range.start, self.dict.as_ref())
+ decoder.read(out, num_values, self.dict.as_ref())
}
fn skip_values(&mut self, num_values: usize) -> Result<usize> {
@@ -590,6 +589,7 @@ mod tests {
use crate::arrow::array_reader::test_util::{byte_array_all_encodings,
utf8_column};
use crate::arrow::record_reader::buffer::ValuesBuffer;
use arrow_array::{Array, StringArray};
+ use arrow_buffer::Buffer;
#[test]
fn test_byte_array_decoder() {
@@ -607,20 +607,20 @@ mod tests {
let mut output = OffsetBuffer::<i32>::default();
decoder.set_data(encoding, page, 4, Some(4)).unwrap();
- assert_eq!(decoder.read(&mut output, 0..1).unwrap(), 1);
+ assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
assert_eq!(output.values.as_slice(), "hello".as_bytes());
assert_eq!(output.offsets.as_slice(), &[0, 5]);
- assert_eq!(decoder.read(&mut output, 1..2).unwrap(), 1);
+ assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
assert_eq!(output.values.as_slice(), "helloworld".as_bytes());
assert_eq!(output.offsets.as_slice(), &[0, 5, 10]);
- assert_eq!(decoder.read(&mut output, 2..4).unwrap(), 2);
+ assert_eq!(decoder.read(&mut output, 2).unwrap(), 2);
assert_eq!(output.values.as_slice(), "helloworldab".as_bytes());
assert_eq!(output.offsets.as_slice(), &[0, 5, 10, 11, 12]);
- assert_eq!(decoder.read(&mut output, 4..8).unwrap(), 0);
+ assert_eq!(decoder.read(&mut output, 4).unwrap(), 0);
let valid = [false, false, true, true, false, true, true, false,
false];
let valid_buffer = Buffer::from_iter(valid.iter().cloned());
@@ -662,7 +662,7 @@ mod tests {
let mut output = OffsetBuffer::<i32>::default();
decoder.set_data(encoding, page, 4, Some(4)).unwrap();
- assert_eq!(decoder.read(&mut output, 0..1).unwrap(), 1);
+ assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
assert_eq!(output.values.as_slice(), "hello".as_bytes());
assert_eq!(output.offsets.as_slice(), &[0, 5]);
@@ -670,11 +670,11 @@ mod tests {
assert_eq!(decoder.skip_values(1).unwrap(), 1);
assert_eq!(decoder.skip_values(1).unwrap(), 1);
- assert_eq!(decoder.read(&mut output, 1..2).unwrap(), 1);
+ assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
assert_eq!(output.values.as_slice(), "hellob".as_bytes());
assert_eq!(output.offsets.as_slice(), &[0, 5, 6]);
- assert_eq!(decoder.read(&mut output, 4..8).unwrap(), 0);
+ assert_eq!(decoder.read(&mut output, 4).unwrap(), 0);
let valid = [false, false, true, true, false, false];
let valid_buffer = Buffer::from_iter(valid.iter().cloned());
@@ -705,7 +705,7 @@ mod tests {
for (encoding, page) in pages.clone() {
let mut output = OffsetBuffer::<i32>::default();
decoder.set_data(encoding, page, 4, None).unwrap();
- assert_eq!(decoder.read(&mut output, 0..1024).unwrap(), 0);
+ assert_eq!(decoder.read(&mut output, 1024).unwrap(), 0);
}
// test nulls skip
diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs
b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
index a381223541..3678f24621 100644
--- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs
+++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
@@ -17,18 +17,16 @@
use std::any::Any;
use std::marker::PhantomData;
-use std::ops::Range;
use std::sync::Arc;
use arrow_array::{Array, ArrayRef, OffsetSizeTrait};
-use arrow_buffer::{ArrowNativeType, Buffer};
+use arrow_buffer::ArrowNativeType;
use arrow_schema::DataType as ArrowType;
use bytes::Bytes;
use crate::arrow::array_reader::byte_array::{ByteArrayDecoder,
ByteArrayDecoderPlain};
use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
use crate::arrow::buffer::{dictionary_buffer::DictionaryBuffer,
offset_buffer::OffsetBuffer};
-use crate::arrow::record_reader::buffer::BufferQueue;
use crate::arrow::record_reader::GenericRecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
use crate::basic::{ConvertedType, Encoding};
@@ -124,8 +122,8 @@ pub fn make_byte_array_dictionary_reader(
struct ByteArrayDictionaryReader<K: ArrowNativeType, V: OffsetSizeTrait> {
data_type: ArrowType,
pages: Box<dyn PageIterator>,
- def_levels_buffer: Option<Buffer>,
- rep_levels_buffer: Option<Buffer>,
+ def_levels_buffer: Option<Vec<i16>>,
+ rep_levels_buffer: Option<Vec<i16>>,
record_reader: GenericRecordReader<DictionaryBuffer<K, V>,
DictionaryDecoder<K, V>>,
}
@@ -183,11 +181,11 @@ where
}
fn get_def_levels(&self) -> Option<&[i16]> {
- self.def_levels_buffer.as_ref().map(|buf| buf.typed_data())
+ self.def_levels_buffer.as_deref()
}
fn get_rep_levels(&self) -> Option<&[i16]> {
- self.rep_levels_buffer.as_ref().map(|buf| buf.typed_data())
+ self.rep_levels_buffer.as_deref()
}
}
@@ -224,7 +222,7 @@ where
K: FromBytes + Ord + ArrowNativeType,
V: OffsetSizeTrait,
{
- type Slice = DictionaryBuffer<K, V>;
+ type Buffer = DictionaryBuffer<K, V>;
fn new(col: &ColumnDescPtr) -> Self {
let validate_utf8 = col.converted_type() == ConvertedType::UTF8;
@@ -306,16 +304,16 @@ where
Ok(())
}
- fn read(&mut self, out: &mut Self::Slice, range: Range<usize>) ->
Result<usize> {
+ fn read(&mut self, out: &mut Self::Buffer, num_values: usize) ->
Result<usize> {
match self.decoder.as_mut().expect("decoder set") {
MaybeDictionaryDecoder::Fallback(decoder) => {
- decoder.read(out.spill_values()?, range.end - range.start,
None)
+ decoder.read(out.spill_values()?, num_values, None)
}
MaybeDictionaryDecoder::Dict {
decoder,
max_remaining_values,
} => {
- let len = (range.end - range.start).min(*max_remaining_values);
+ let len = num_values.min(*max_remaining_values);
let dict = self
.dict
@@ -332,8 +330,12 @@ where
Some(keys) => {
// Happy path - can just copy keys
// Keys will be validated on conversion to arrow
- let keys_slice = keys.get_output_slice(len);
- let len = decoder.get_batch(keys_slice)?;
+
+ // TODO: Push vec into decoder (#5177)
+ let start = keys.len();
+ keys.resize(start + len, K::default());
+ let len = decoder.get_batch(&mut keys[start..])?;
+ keys.truncate(start + len);
*max_remaining_values -= len;
Ok(len)
}
@@ -381,6 +383,7 @@ where
mod tests {
use arrow::compute::cast;
use arrow_array::{Array, StringArray};
+ use arrow_buffer::Buffer;
use crate::arrow::array_reader::test_util::{
byte_array_all_encodings, encode_dictionary, utf8_column,
@@ -416,7 +419,7 @@ mod tests {
.unwrap();
let mut output = DictionaryBuffer::<i32, i32>::default();
- assert_eq!(decoder.read(&mut output, 0..3).unwrap(), 3);
+ assert_eq!(decoder.read(&mut output, 3).unwrap(), 3);
let mut valid = vec![false, false, true, true, false, true];
let valid_buffer = Buffer::from_iter(valid.iter().cloned());
@@ -424,7 +427,7 @@ mod tests {
assert!(matches!(output, DictionaryBuffer::Dict { .. }));
- assert_eq!(decoder.read(&mut output, 0..4).unwrap(), 4);
+ assert_eq!(decoder.read(&mut output, 4).unwrap(), 4);
valid.extend_from_slice(&[false, false, true, true, false, true, true,
false]);
let valid_buffer = Buffer::from_iter(valid.iter().cloned());
@@ -484,17 +487,17 @@ mod tests {
let mut output = DictionaryBuffer::<i32, i32>::default();
// read two skip one
- assert_eq!(decoder.read(&mut output, 0..2).unwrap(), 2);
+ assert_eq!(decoder.read(&mut output, 2).unwrap(), 2);
assert_eq!(decoder.skip_values(1).unwrap(), 1);
assert!(matches!(output, DictionaryBuffer::Dict { .. }));
// read two skip one
- assert_eq!(decoder.read(&mut output, 2..4).unwrap(), 2);
+ assert_eq!(decoder.read(&mut output, 2).unwrap(), 2);
assert_eq!(decoder.skip_values(1).unwrap(), 1);
// read one and test on skip at the end
- assert_eq!(decoder.read(&mut output, 4..5).unwrap(), 1);
+ assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
assert_eq!(decoder.skip_values(4).unwrap(), 0);
let valid = [true, true, true, true, true];
@@ -536,7 +539,7 @@ mod tests {
for (encoding, page) in pages {
decoder.set_data(encoding, page, 4, Some(4)).unwrap();
- assert_eq!(decoder.read(&mut output, 0..1024).unwrap(), 4);
+ assert_eq!(decoder.read(&mut output, 1024).unwrap(), 4);
}
let array = output.into_array(None, &data_type).unwrap();
assert_eq!(array.data_type(), &data_type);
@@ -580,7 +583,7 @@ mod tests {
for (encoding, page) in pages {
decoder.set_data(encoding, page, 4, Some(4)).unwrap();
decoder.skip_values(2).expect("skipping two values");
- assert_eq!(decoder.read(&mut output, 0..1024).unwrap(), 2);
+ assert_eq!(decoder.read(&mut output, 1024).unwrap(), 2);
}
let array = output.into_array(None, &data_type).unwrap();
assert_eq!(array.data_type(), &data_type);
@@ -641,7 +644,7 @@ mod tests {
for (encoding, page) in pages.clone() {
let mut output = DictionaryBuffer::<i32, i32>::default();
decoder.set_data(encoding, page, 8, None).unwrap();
- assert_eq!(decoder.read(&mut output, 0..1024).unwrap(), 0);
+ assert_eq!(decoder.read(&mut output, 1024).unwrap(), 0);
output.pad_nulls(0, 0, 8, &[0]);
let array = output
diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
index 849aa37c56..a0d25d403c 100644
--- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
+++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
@@ -18,12 +18,12 @@
use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
use crate::arrow::buffer::bit_util::{iter_set_bits_rev, sign_extend_be};
use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder};
-use crate::arrow::record_reader::buffer::{BufferQueue, ValuesBuffer};
+use crate::arrow::record_reader::buffer::ValuesBuffer;
use crate::arrow::record_reader::GenericRecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
use crate::basic::{Encoding, Type};
use crate::column::page::PageIterator;
-use crate::column::reader::decoder::{ColumnValueDecoder, ValuesBufferSlice};
+use crate::column::reader::decoder::ColumnValueDecoder;
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use arrow_array::{
@@ -36,7 +36,6 @@ use arrow_schema::{DataType as ArrowType, IntervalUnit};
use bytes::Bytes;
use half::f16;
use std::any::Any;
-use std::ops::Range;
use std::sync::Arc;
/// Returns an [`ArrayReader`] that decodes the provided fixed length byte
array column
@@ -117,8 +116,8 @@ struct FixedLenByteArrayReader {
data_type: ArrowType,
byte_length: usize,
pages: Box<dyn PageIterator>,
- def_levels_buffer: Option<Buffer>,
- rep_levels_buffer: Option<Buffer>,
+ def_levels_buffer: Option<Vec<i16>>,
+ rep_levels_buffer: Option<Vec<i16>>,
record_reader: GenericRecordReader<FixedLenByteArrayBuffer, ValueDecoder>,
}
@@ -135,13 +134,7 @@ impl FixedLenByteArrayReader {
pages,
def_levels_buffer: None,
rep_levels_buffer: None,
- record_reader: GenericRecordReader::new_with_records(
- column_desc,
- FixedLenByteArrayBuffer {
- buffer: Default::default(),
- byte_length,
- },
- ),
+ record_reader: GenericRecordReader::new(column_desc),
}
}
}
@@ -164,7 +157,7 @@ impl ArrayReader for FixedLenByteArrayReader {
let array_data =
ArrayDataBuilder::new(ArrowType::FixedSizeBinary(self.byte_length as i32))
.len(self.record_reader.num_values())
- .add_buffer(record_data)
+ .add_buffer(Buffer::from_vec(record_data.buffer))
.null_bit_buffer(self.record_reader.consume_bitmap_buffer());
let binary = FixedSizeBinaryArray::from(unsafe {
array_data.build_unchecked() });
@@ -231,41 +224,19 @@ impl ArrayReader for FixedLenByteArrayReader {
}
fn get_def_levels(&self) -> Option<&[i16]> {
- self.def_levels_buffer.as_ref().map(|buf| buf.typed_data())
+ self.def_levels_buffer.as_deref()
}
fn get_rep_levels(&self) -> Option<&[i16]> {
- self.rep_levels_buffer.as_ref().map(|buf| buf.typed_data())
+ self.rep_levels_buffer.as_deref()
}
}
+#[derive(Default)]
struct FixedLenByteArrayBuffer {
buffer: Vec<u8>,
/// The length of each element in bytes
- byte_length: usize,
-}
-
-impl ValuesBufferSlice for FixedLenByteArrayBuffer {
- fn capacity(&self) -> usize {
- usize::MAX
- }
-}
-
-impl BufferQueue for FixedLenByteArrayBuffer {
- type Output = Buffer;
- type Slice = Self;
-
- fn consume(&mut self) -> Self::Output {
- Buffer::from_vec(self.buffer.consume())
- }
-
- fn get_output_slice(&mut self, _batch_size: usize) -> &mut Self::Slice {
- self
- }
-
- fn truncate_buffer(&mut self, len: usize) {
- assert_eq!(self.buffer.len(), len * self.byte_length);
- }
+ byte_length: Option<usize>,
}
impl ValuesBuffer for FixedLenByteArrayBuffer {
@@ -276,12 +247,11 @@ impl ValuesBuffer for FixedLenByteArrayBuffer {
levels_read: usize,
valid_mask: &[u8],
) {
- assert_eq!(
- self.buffer.len(),
- (read_offset + values_read) * self.byte_length
- );
+ let byte_length = self.byte_length.unwrap_or_default();
+
+ assert_eq!(self.buffer.len(), (read_offset + values_read) *
byte_length);
self.buffer
- .resize((read_offset + levels_read) * self.byte_length, 0);
+ .resize((read_offset + levels_read) * byte_length, 0);
let values_range = read_offset..read_offset + values_read;
for (value_pos, level_pos) in
values_range.rev().zip(iter_set_bits_rev(valid_mask)) {
@@ -290,10 +260,10 @@ impl ValuesBuffer for FixedLenByteArrayBuffer {
break;
}
- let level_pos_bytes = level_pos * self.byte_length;
- let value_pos_bytes = value_pos * self.byte_length;
+ let level_pos_bytes = level_pos * byte_length;
+ let value_pos_bytes = value_pos * byte_length;
- for i in 0..self.byte_length {
+ for i in 0..byte_length {
self.buffer[level_pos_bytes + i] = self.buffer[value_pos_bytes
+ i]
}
}
@@ -307,7 +277,7 @@ struct ValueDecoder {
}
impl ColumnValueDecoder for ValueDecoder {
- type Slice = FixedLenByteArrayBuffer;
+ type Buffer = FixedLenByteArrayBuffer;
fn new(col: &ColumnDescPtr) -> Self {
Self {
@@ -374,13 +344,16 @@ impl ColumnValueDecoder for ValueDecoder {
Ok(())
}
- fn read(&mut self, out: &mut Self::Slice, range: Range<usize>) ->
Result<usize> {
- assert_eq!(self.byte_length, out.byte_length);
+ fn read(&mut self, out: &mut Self::Buffer, num_values: usize) ->
Result<usize> {
+ match out.byte_length {
+ Some(x) => assert_eq!(x, self.byte_length),
+ None => out.byte_length = Some(self.byte_length),
+ }
- let len = range.end - range.start;
match self.decoder.as_mut().unwrap() {
Decoder::Plain { offset, buf } => {
- let to_read = (len * self.byte_length).min(buf.len() -
*offset) / self.byte_length;
+ let to_read =
+ (num_values * self.byte_length).min(buf.len() - *offset) /
self.byte_length;
let end_offset = *offset + to_read * self.byte_length;
out.buffer
.extend_from_slice(&buf.as_ref()[*offset..end_offset]);
@@ -394,7 +367,7 @@ impl ColumnValueDecoder for ValueDecoder {
return Ok(0);
}
- decoder.read(len, |keys| {
+ decoder.read(num_values, |keys| {
out.buffer.reserve(keys.len() * self.byte_length);
for key in keys {
let offset = *key as usize * self.byte_length;
@@ -405,7 +378,7 @@ impl ColumnValueDecoder for ValueDecoder {
})
}
Decoder::Delta { decoder } => {
- let to_read = len.min(decoder.remaining());
+ let to_read = num_values.min(decoder.remaining());
out.buffer.reserve(to_read * self.byte_length);
decoder.read(to_read, |slice| {
diff --git a/parquet/src/arrow/array_reader/mod.rs
b/parquet/src/arrow/array_reader/mod.rs
index a4ee504059..c4e9fc5fa0 100644
--- a/parquet/src/arrow/array_reader/mod.rs
+++ b/parquet/src/arrow/array_reader/mod.rs
@@ -129,7 +129,7 @@ fn read_records<V, CV>(
) -> Result<usize>
where
V: ValuesBuffer,
- CV: ColumnValueDecoder<Slice = V::Slice>,
+ CV: ColumnValueDecoder<Buffer = V>,
{
let mut records_read = 0usize;
while records_read < batch_size {
@@ -163,7 +163,7 @@ fn skip_records<V, CV>(
) -> Result<usize>
where
V: ValuesBuffer,
- CV: ColumnValueDecoder<Slice = V::Slice>,
+ CV: ColumnValueDecoder<Buffer = V>,
{
let mut records_skipped = 0usize;
while records_skipped < batch_size {
diff --git a/parquet/src/arrow/array_reader/null_array.rs
b/parquet/src/arrow/array_reader/null_array.rs
index bb32fb307f..838db854e0 100644
--- a/parquet/src/arrow/array_reader/null_array.rs
+++ b/parquet/src/arrow/array_reader/null_array.rs
@@ -22,7 +22,7 @@ use crate::data_type::DataType;
use crate::errors::Result;
use crate::schema::types::ColumnDescPtr;
use arrow_array::ArrayRef;
-use arrow_buffer::{ArrowNativeType, Buffer};
+use arrow_buffer::ArrowNativeType;
use arrow_schema::DataType as ArrowType;
use std::any::Any;
use std::sync::Arc;
@@ -36,8 +36,8 @@ where
{
data_type: ArrowType,
pages: Box<dyn PageIterator>,
- def_levels_buffer: Option<Buffer>,
- rep_levels_buffer: Option<Buffer>,
+ def_levels_buffer: Option<Vec<i16>>,
+ rep_levels_buffer: Option<Vec<i16>>,
record_reader: RecordReader<T>,
}
@@ -99,10 +99,10 @@ where
}
fn get_def_levels(&self) -> Option<&[i16]> {
- self.def_levels_buffer.as_ref().map(|buf| buf.typed_data())
+ self.def_levels_buffer.as_deref()
}
fn get_rep_levels(&self) -> Option<&[i16]> {
- self.rep_levels_buffer.as_ref().map(|buf| buf.typed_data())
+ self.rep_levels_buffer.as_deref()
}
}
diff --git a/parquet/src/arrow/array_reader/primitive_array.rs
b/parquet/src/arrow/array_reader/primitive_array.rs
index 507b6215ca..07ecc27d9b 100644
--- a/parquet/src/arrow/array_reader/primitive_array.rs
+++ b/parquet/src/arrow/array_reader/primitive_array.rs
@@ -77,8 +77,8 @@ where
{
data_type: ArrowType,
pages: Box<dyn PageIterator>,
- def_levels_buffer: Option<Buffer>,
- rep_levels_buffer: Option<Buffer>,
+ def_levels_buffer: Option<Vec<i16>>,
+ rep_levels_buffer: Option<Vec<i16>>,
record_reader: RecordReader<T>,
}
@@ -287,11 +287,11 @@ where
}
fn get_def_levels(&self) -> Option<&[i16]> {
- self.def_levels_buffer.as_ref().map(|buf| buf.typed_data())
+ self.def_levels_buffer.as_deref()
}
fn get_rep_levels(&self) -> Option<&[i16]> {
- self.rep_levels_buffer.as_ref().map(|buf| buf.typed_data())
+ self.rep_levels_buffer.as_deref()
}
}
diff --git a/parquet/src/arrow/buffer/dictionary_buffer.rs
b/parquet/src/arrow/buffer/dictionary_buffer.rs
index d0f63024ed..9e5b2293aa 100644
--- a/parquet/src/arrow/buffer/dictionary_buffer.rs
+++ b/parquet/src/arrow/buffer/dictionary_buffer.rs
@@ -16,8 +16,7 @@
// under the License.
use crate::arrow::buffer::offset_buffer::OffsetBuffer;
-use crate::arrow::record_reader::buffer::{BufferQueue, ValuesBuffer};
-use crate::column::reader::decoder::ValuesBufferSlice;
+use crate::arrow::record_reader::buffer::ValuesBuffer;
use crate::errors::{ParquetError, Result};
use arrow_array::{make_array, Array, ArrayRef, OffsetSizeTrait};
use arrow_buffer::{ArrowNativeType, Buffer};
@@ -185,12 +184,6 @@ impl<K: ArrowNativeType + Ord, V: OffsetSizeTrait>
DictionaryBuffer<K, V> {
}
}
-impl<K: ArrowNativeType, V: OffsetSizeTrait> ValuesBufferSlice for
DictionaryBuffer<K, V> {
- fn capacity(&self) -> usize {
- usize::MAX
- }
-}
-
impl<K: ArrowNativeType, V: OffsetSizeTrait> ValuesBuffer for
DictionaryBuffer<K, V> {
fn pad_nulls(
&mut self,
@@ -211,34 +204,6 @@ impl<K: ArrowNativeType, V: OffsetSizeTrait> ValuesBuffer
for DictionaryBuffer<K
}
}
-impl<K: ArrowNativeType, V: OffsetSizeTrait> BufferQueue for
DictionaryBuffer<K, V> {
- type Output = Self;
- type Slice = Self;
-
- fn consume(&mut self) -> Self::Output {
- match self {
- Self::Dict { keys, values } => Self::Dict {
- keys: std::mem::take(keys),
- values: values.clone(),
- },
- Self::Values { values } => Self::Values {
- values: values.consume(),
- },
- }
- }
-
- fn get_output_slice(&mut self, _batch_size: usize) -> &mut Self::Slice {
- self
- }
-
- fn truncate_buffer(&mut self, len: usize) {
- match self {
- Self::Dict { keys, .. } => keys.truncate_buffer(len),
- Self::Values { values } => values.truncate_buffer(len),
- }
- }
-}
-
#[cfg(test)]
mod tests {
use super::*;
@@ -274,7 +239,7 @@ mod tests {
buffer.pad_nulls(read_offset, 2, 5, null_buffer.as_slice());
assert_eq!(buffer.len(), 13);
- let split = buffer.consume();
+ let split = std::mem::take(&mut buffer);
let array = split.into_array(Some(null_buffer), &dict_type).unwrap();
assert_eq!(array.data_type(), &dict_type);
@@ -309,7 +274,9 @@ mod tests {
.unwrap()
.extend_from_slice(&[0, 1, 0, 1]);
- let array = buffer.consume().into_array(None, &dict_type).unwrap();
+ let array = std::mem::take(&mut buffer)
+ .into_array(None, &dict_type)
+ .unwrap();
assert_eq!(array.data_type(), &dict_type);
let strings = cast(&array, &ArrowType::Utf8).unwrap();
@@ -320,7 +287,7 @@ mod tests {
);
// Can recreate with new dictionary as keys empty
- assert!(matches!(&buffer, DictionaryBuffer::Dict { .. }));
+ assert!(matches!(&buffer, DictionaryBuffer::Values { .. }));
assert_eq!(buffer.len(), 0);
let d3 = Arc::new(StringArray::from(vec!["bongo"])) as ArrayRef;
buffer.as_keys(&d3).unwrap().extend_from_slice(&[0, 0]);
diff --git a/parquet/src/arrow/buffer/offset_buffer.rs
b/parquet/src/arrow/buffer/offset_buffer.rs
index 459c94ed28..ce9eb1142a 100644
--- a/parquet/src/arrow/buffer/offset_buffer.rs
+++ b/parquet/src/arrow/buffer/offset_buffer.rs
@@ -16,8 +16,7 @@
// under the License.
use crate::arrow::buffer::bit_util::iter_set_bits_rev;
-use crate::arrow::record_reader::buffer::{BufferQueue, ValuesBuffer};
-use crate::column::reader::decoder::ValuesBufferSlice;
+use crate::arrow::record_reader::buffer::ValuesBuffer;
use crate::errors::{ParquetError, Result};
use arrow_array::{make_array, ArrayRef, OffsetSizeTrait};
use arrow_buffer::{ArrowNativeType, Buffer};
@@ -141,23 +140,6 @@ impl<I: OffsetSizeTrait> OffsetBuffer<I> {
}
}
-impl<I: OffsetSizeTrait> BufferQueue for OffsetBuffer<I> {
- type Output = Self;
- type Slice = Self;
-
- fn consume(&mut self) -> Self::Output {
- std::mem::take(self)
- }
-
- fn get_output_slice(&mut self, _batch_size: usize) -> &mut Self::Slice {
- self
- }
-
- fn truncate_buffer(&mut self, len: usize) {
- assert_eq!(self.offsets.len(), len + 1);
- }
-}
-
impl<I: OffsetSizeTrait> ValuesBuffer for OffsetBuffer<I> {
fn pad_nulls(
&mut self,
@@ -208,12 +190,6 @@ impl<I: OffsetSizeTrait> ValuesBuffer for OffsetBuffer<I> {
}
}
-impl<I: OffsetSizeTrait> ValuesBufferSlice for OffsetBuffer<I> {
- fn capacity(&self) -> usize {
- usize::MAX
- }
-}
-
#[cfg(test)]
mod tests {
use super::*;
@@ -250,7 +226,7 @@ mod tests {
for v in ["hello", "world", "cupcakes", "a", "b", "c"] {
buffer.try_push(v.as_bytes(), false).unwrap()
}
- let split = buffer.consume();
+ let split = std::mem::take(&mut buffer);
let array = split.into_array(None, ArrowType::Utf8);
let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
diff --git a/parquet/src/arrow/record_reader/buffer.rs
b/parquet/src/arrow/record_reader/buffer.rs
index 3914710ff7..880407a547 100644
--- a/parquet/src/arrow/record_reader/buffer.rs
+++ b/parquet/src/arrow/record_reader/buffer.rs
@@ -17,69 +17,8 @@
use crate::arrow::buffer::bit_util::iter_set_bits_rev;
-/// A buffer that supports writing new data to the end, and removing data from
the front
-///
-/// Used by [RecordReader](`super::RecordReader`) to buffer up values before
returning a
-/// potentially smaller number of values, corresponding to a whole number of
semantic records
-pub trait BufferQueue: Sized {
- type Output: Sized;
-
- type Slice: ?Sized;
-
- /// Consumes the contents of this [`BufferQueue`]
- fn consume(&mut self) -> Self::Output;
-
- /// Returns a [`Self::Slice`] with at least `batch_size` capacity that can
be used
- /// to append data to the end of this [`BufferQueue`]
- ///
- /// NB: writes to the returned slice will not update the length of
[`BufferQueue`]
- /// instead a subsequent call should be made to
[`BufferQueue::truncate_buffer`]
- fn get_output_slice(&mut self, batch_size: usize) -> &mut Self::Slice;
-
- /// Sets the length of the [`BufferQueue`].
- ///
- /// Intended to be used in combination with
[`BufferQueue::get_output_slice`]
- ///
- /// # Panics
- ///
- /// Implementations must panic if `len` is beyond the initialized length
- ///
- /// Implementations may panic if `set_len` is called with less than what
has been written
- ///
- /// This distinction is to allow for implementations that return a default
initialized
- /// [BufferQueue::Slice`] which doesn't track capacity and length
separately
- ///
- /// For example, [`BufferQueue`] returns a default-initialized `&mut [T]`,
and does not
- /// track how much of this slice is actually written to by the caller.
This is still
- /// safe as the slice is default-initialized.
- ///
- fn truncate_buffer(&mut self, len: usize);
-}
-
-impl<T: Copy + Default> BufferQueue for Vec<T> {
- type Output = Self;
-
- type Slice = [T];
-
- fn consume(&mut self) -> Self::Output {
- std::mem::take(self)
- }
-
- fn get_output_slice(&mut self, batch_size: usize) -> &mut Self::Slice {
- let len = self.len();
- self.resize(len + batch_size, T::default());
- &mut self[len..]
- }
-
- fn truncate_buffer(&mut self, len: usize) {
- assert!(len <= self.len());
- self.truncate(len)
- }
-}
-
-/// A [`BufferQueue`] capable of storing column values
-pub trait ValuesBuffer: BufferQueue {
- ///
+/// A buffer that supports padding with nulls
+pub trait ValuesBuffer: Default {
/// If a column contains nulls, more level data may be read than value
data, as null
/// values are not encoded. Therefore, first the levels data is read, the
null count
/// determined, and then the corresponding number of values read to a
[`ValuesBuffer`].
@@ -111,7 +50,7 @@ impl<T: Copy + Default> ValuesBuffer for Vec<T> {
levels_read: usize,
valid_mask: &[u8],
) {
- assert!(self.len() >= read_offset + levels_read);
+ self.resize(read_offset + levels_read, T::default());
let values_range = read_offset..read_offset + values_read;
for (value_pos, level_pos) in
values_range.rev().zip(iter_set_bits_rev(valid_mask)) {
diff --git a/parquet/src/arrow/record_reader/definition_levels.rs
b/parquet/src/arrow/record_reader/definition_levels.rs
index fa041f5fdb..87f531df09 100644
--- a/parquet/src/arrow/record_reader/definition_levels.rs
+++ b/parquet/src/arrow/record_reader/definition_levels.rs
@@ -15,8 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-use std::ops::Range;
-
use arrow_array::builder::BooleanBufferBuilder;
use arrow_buffer::bit_chunk_iterator::UnalignedBitChunk;
use arrow_buffer::Buffer;
@@ -25,7 +23,7 @@ use bytes::Bytes;
use crate::arrow::buffer::bit_util::count_set_bits;
use crate::basic::Encoding;
use crate::column::reader::decoder::{
- ColumnLevelDecoder, DefinitionLevelDecoder, DefinitionLevelDecoderImpl,
LevelsBufferSlice,
+ ColumnLevelDecoder, DefinitionLevelDecoder, DefinitionLevelDecoderImpl,
};
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
@@ -85,18 +83,13 @@ impl DefinitionLevelBuffer {
}
/// Returns the built level data
- pub fn consume_levels(&mut self) -> Option<Buffer> {
+ pub fn consume_levels(&mut self) -> Option<Vec<i16>> {
match &mut self.inner {
- BufferInner::Full { levels, .. } =>
Some(Buffer::from_vec(std::mem::take(levels))),
+ BufferInner::Full { levels, .. } => Some(std::mem::take(levels)),
BufferInner::Mask { .. } => None,
}
}
- pub fn set_len(&mut self, len: usize) {
- assert_eq!(self.nulls().len(), len);
- self.len = len;
- }
-
/// Returns the built null bitmask
pub fn consume_bitmask(&mut self) -> Buffer {
self.len = 0;
@@ -114,18 +107,6 @@ impl DefinitionLevelBuffer {
}
}
-impl LevelsBufferSlice for DefinitionLevelBuffer {
- fn capacity(&self) -> usize {
- usize::MAX
- }
-
- fn count_nulls(&self, range: Range<usize>, _max_level: i16) -> usize {
- let total_count = range.end - range.start;
- let range = range.start + self.len..range.end + self.len;
- total_count - count_set_bits(self.nulls().as_slice(), range)
- }
-}
-
enum MaybePacked {
Packed(PackedDecoder),
Fallback(DefinitionLevelDecoderImpl),
@@ -148,7 +129,7 @@ impl DefinitionLevelBufferDecoder {
}
impl ColumnLevelDecoder for DefinitionLevelBufferDecoder {
- type Slice = DefinitionLevelBuffer;
+ type Buffer = DefinitionLevelBuffer;
fn set_data(&mut self, encoding: Encoding, data: Bytes) {
match &mut self.decoder {
@@ -159,7 +140,11 @@ impl ColumnLevelDecoder for DefinitionLevelBufferDecoder {
}
impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder {
- fn read_def_levels(&mut self, writer: &mut Self::Slice, range:
Range<usize>) -> Result<usize> {
+ fn read_def_levels(
+ &mut self,
+ writer: &mut Self::Buffer,
+ num_levels: usize,
+ ) -> Result<(usize, usize)> {
match (&mut writer.inner, &mut self.decoder) {
(
BufferInner::Full {
@@ -170,33 +155,33 @@ impl DefinitionLevelDecoder for
DefinitionLevelBufferDecoder {
MaybePacked::Fallback(decoder),
) => {
assert_eq!(self.max_level, *max_level);
- assert_eq!(range.start + writer.len, nulls.len());
- levels.resize(range.end + writer.len, 0);
-
- let slice = &mut levels[writer.len..];
- let levels_read = decoder.read_def_levels(slice,
range.clone())?;
+ let start = levels.len();
+ let (values_read, levels_read) =
decoder.read_def_levels(levels, num_levels)?;
nulls.reserve(levels_read);
- for i in &slice[range.start..range.start + levels_read] {
- nulls.append(i == max_level)
+ for i in &levels[start..] {
+ nulls.append(i == max_level);
}
- Ok(levels_read)
+ Ok((values_read, levels_read))
}
(BufferInner::Mask { nulls }, MaybePacked::Packed(decoder)) => {
assert_eq!(self.max_level, 1);
- assert_eq!(range.start + writer.len, nulls.len());
- decoder.read(nulls, range.end - range.start)
+ let start = nulls.len();
+ let levels_read = decoder.read(nulls, num_levels)?;
+
+ let values_read = count_set_bits(nulls.as_slice(),
start..start + levels_read);
+ Ok((values_read, levels_read))
}
_ => unreachable!("inconsistent null mask"),
}
}
- fn skip_def_levels(&mut self, num_levels: usize, max_def_level: i16) ->
Result<(usize, usize)> {
+ fn skip_def_levels(&mut self, num_levels: usize) -> Result<(usize, usize)>
{
match &mut self.decoder {
- MaybePacked::Fallback(decoder) =>
decoder.skip_def_levels(num_levels, max_def_level),
+ MaybePacked::Fallback(decoder) =>
decoder.skip_def_levels(num_levels),
MaybePacked::Packed(decoder) => decoder.skip(num_levels),
}
}
diff --git a/parquet/src/arrow/record_reader/mod.rs
b/parquet/src/arrow/record_reader/mod.rs
index 49c69c87e3..7456da053b 100644
--- a/parquet/src/arrow/record_reader/mod.rs
+++ b/parquet/src/arrow/record_reader/mod.rs
@@ -18,7 +18,7 @@
use arrow_buffer::Buffer;
use crate::arrow::record_reader::{
- buffer::{BufferQueue, ValuesBuffer},
+ buffer::ValuesBuffer,
definition_levels::{DefinitionLevelBuffer, DefinitionLevelBufferDecoder},
};
use crate::column::reader::decoder::RepetitionLevelDecoderImpl;
@@ -62,28 +62,18 @@ pub struct GenericRecordReader<V, CV> {
impl<V, CV> GenericRecordReader<V, CV>
where
- V: ValuesBuffer + Default,
- CV: ColumnValueDecoder<Slice = V::Slice>,
+ V: ValuesBuffer,
+ CV: ColumnValueDecoder<Buffer = V>,
{
/// Create a new [`GenericRecordReader`]
pub fn new(desc: ColumnDescPtr) -> Self {
- Self::new_with_records(desc, V::default())
- }
-}
-
-impl<V, CV> GenericRecordReader<V, CV>
-where
- V: ValuesBuffer,
- CV: ColumnValueDecoder<Slice = V::Slice>,
-{
- pub fn new_with_records(desc: ColumnDescPtr, records: V) -> Self {
let def_levels = (desc.max_def_level() > 0)
.then(|| DefinitionLevelBuffer::new(&desc,
packed_null_mask(&desc)));
let rep_levels = (desc.max_rep_level() > 0).then(Vec::new);
Self {
- values: records,
+ values: V::default(),
def_levels,
rep_levels,
column_reader: None,
@@ -166,22 +156,20 @@ where
/// The implementation has side effects. It will create a new buffer to
hold those
/// definition level values that have already been read into memory but
not counted
/// as record values, e.g. those from `self.num_values` to
`self.values_written`.
- pub fn consume_def_levels(&mut self) -> Option<Buffer> {
+ pub fn consume_def_levels(&mut self) -> Option<Vec<i16>> {
self.def_levels.as_mut().and_then(|x| x.consume_levels())
}
/// Return repetition level data.
/// The side effect is similar to `consume_def_levels`.
- pub fn consume_rep_levels(&mut self) -> Option<Buffer> {
- self.rep_levels
- .as_mut()
- .map(|x| Buffer::from_vec(x.consume()))
+ pub fn consume_rep_levels(&mut self) -> Option<Vec<i16>> {
+ self.rep_levels.as_mut().map(std::mem::take)
}
/// Returns currently stored buffer data.
/// The side effect is similar to `consume_def_levels`.
- pub fn consume_record_data(&mut self) -> V::Output {
- self.values.consume()
+ pub fn consume_record_data(&mut self) -> V {
+ std::mem::take(&mut self.values)
}
/// Returns currently stored null bitmap data.
@@ -207,18 +195,13 @@ where
/// Try to read one batch of data returning the number of records read
fn read_one_batch(&mut self, batch_size: usize) -> Result<usize> {
- let rep_levels = self
- .rep_levels
- .as_mut()
- .map(|levels| levels.get_output_slice(batch_size));
- let def_levels = self.def_levels.as_mut();
- let values = self.values.get_output_slice(batch_size);
-
- let (records_read, values_read, levels_read) = self
- .column_reader
- .as_mut()
- .unwrap()
- .read_records(batch_size, def_levels, rep_levels, values)?;
+ let (records_read, values_read, levels_read) =
+ self.column_reader.as_mut().unwrap().read_records(
+ batch_size,
+ self.def_levels.as_mut(),
+ self.rep_levels.as_mut(),
+ &mut self.values,
+ )?;
if values_read < levels_read {
let def_levels = self.def_levels.as_ref().ok_or_else(|| {
@@ -235,13 +218,6 @@ where
self.num_records += records_read;
self.num_values += levels_read;
- self.values.truncate_buffer(self.num_values);
- if let Some(ref mut buf) = self.rep_levels {
- buf.truncate_buffer(self.num_values)
- };
- if let Some(ref mut buf) = self.def_levels {
- buf.set_len(self.num_values)
- };
Ok(records_read)
}
}
@@ -258,7 +234,6 @@ mod tests {
use std::sync::Arc;
use arrow::buffer::Buffer;
- use arrow_array::builder::Int16BufferBuilder;
use crate::basic::Encoding;
use crate::data_type::Int32Type;
@@ -417,11 +392,8 @@ mod tests {
}
// Verify result def levels
- let mut bb = Int16BufferBuilder::new(7);
- bb.append_slice(&[1i16, 2i16, 0i16, 2i16, 2i16, 0i16, 2i16]);
- let expected_def_levels = bb.finish();
assert_eq!(
- Some(expected_def_levels),
+ Some(vec![1i16, 2i16, 0i16, 2i16, 2i16, 0i16, 2i16]),
record_reader.consume_def_levels()
);
@@ -526,11 +498,8 @@ mod tests {
}
// Verify result def levels
- let mut bb = Int16BufferBuilder::new(9);
- bb.append_slice(&[2i16, 0i16, 1i16, 2i16, 2i16, 2i16, 2i16, 2i16,
2i16]);
- let expected_def_levels = bb.finish();
assert_eq!(
- Some(expected_def_levels),
+ Some(vec![2i16, 0i16, 1i16, 2i16, 2i16, 2i16, 2i16, 2i16, 2i16]),
record_reader.consume_def_levels()
);
@@ -792,11 +761,8 @@ mod tests {
}
// Verify result def levels
- let mut bb = Int16BufferBuilder::new(7);
- bb.append_slice(&[0i16, 2i16, 2i16]);
- let expected_def_levels = bb.finish();
assert_eq!(
- Some(expected_def_levels),
+ Some(vec![0i16, 2i16, 2i16]),
record_reader.consume_def_levels()
);
diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs
index 6c712ead62..4f861917c9 100644
--- a/parquet/src/column/reader.rs
+++ b/parquet/src/column/reader.rs
@@ -23,7 +23,7 @@ use super::page::{Page, PageReader};
use crate::basic::*;
use crate::column::reader::decoder::{
ColumnValueDecoder, ColumnValueDecoderImpl, DefinitionLevelDecoder,
DefinitionLevelDecoderImpl,
- LevelsBufferSlice, RepetitionLevelDecoder, RepetitionLevelDecoderImpl,
ValuesBufferSlice,
+ RepetitionLevelDecoder, RepetitionLevelDecoderImpl,
};
use crate::data_type::*;
use crate::errors::{ParquetError, Result};
@@ -193,9 +193,9 @@ where
pub fn read_batch(
&mut self,
batch_size: usize,
- def_levels: Option<&mut D::Slice>,
- rep_levels: Option<&mut R::Slice>,
- values: &mut V::Slice,
+ def_levels: Option<&mut D::Buffer>,
+ rep_levels: Option<&mut R::Buffer>,
+ values: &mut V::Buffer,
) -> Result<(usize, usize)> {
let (_, values, levels) = self.read_records(batch_size, def_levels,
rep_levels, values)?;
@@ -219,41 +219,26 @@ where
pub fn read_records(
&mut self,
max_records: usize,
- mut def_levels: Option<&mut D::Slice>,
- mut rep_levels: Option<&mut R::Slice>,
- values: &mut V::Slice,
+ mut def_levels: Option<&mut D::Buffer>,
+ mut rep_levels: Option<&mut R::Buffer>,
+ values: &mut V::Buffer,
) -> Result<(usize, usize, usize)> {
- let mut max_levels = values.capacity().min(max_records);
- if let Some(ref levels) = def_levels {
- max_levels = max_levels.min(levels.capacity());
- }
- if let Some(ref levels) = rep_levels {
- max_levels = max_levels.min(levels.capacity())
- }
-
let mut total_records_read = 0;
let mut total_levels_read = 0;
let mut total_values_read = 0;
- while total_records_read < max_records
- && total_levels_read < max_levels
- && self.has_next()?
- {
+ while total_records_read < max_records && self.has_next()? {
let remaining_records = max_records - total_records_read;
let remaining_levels = self.num_buffered_values -
self.num_decoded_values;
- let levels_to_read = remaining_levels.min(max_levels -
total_levels_read);
- let (records_read, levels_read) = match
self.rep_level_decoder.as_mut() {
+ let (records_read, levels_to_read) = match
self.rep_level_decoder.as_mut() {
Some(reader) => {
let out = rep_levels
.as_mut()
.ok_or_else(|| general_err!("must specify repetition
levels"))?;
- let (mut records_read, levels_read) =
reader.read_rep_levels(
- out,
- total_levels_read..total_levels_read + levels_to_read,
- remaining_records,
- )?;
+ let (mut records_read, levels_read) =
+ reader.read_rep_levels(out, remaining_records,
remaining_levels)?;
if levels_read == remaining_levels &&
self.has_record_delimiter {
// Reached end of page, which implies records_read <
remaining_records
@@ -264,7 +249,7 @@ where
(records_read, levels_read)
}
None => {
- let min = remaining_records.min(levels_to_read);
+ let min = remaining_records.min(remaining_levels);
(min, min)
}
};
@@ -275,26 +260,18 @@ where
.as_mut()
.ok_or_else(|| general_err!("must specify definition
levels"))?;
- let read = reader
- .read_def_levels(out,
total_levels_read..total_levels_read + levels_read)?;
+ let (values_read, levels_read) =
reader.read_def_levels(out, levels_to_read)?;
- if read != levels_read {
+ if levels_read != levels_to_read {
return Err(general_err!("insufficient definition
levels read from column - expected {rep_levels}, got {read}"));
}
- let null_count = out.count_nulls(
- total_levels_read..total_levels_read + read,
- self.descr.max_def_level(),
- );
- levels_read - null_count
+ values_read
}
- None => levels_read,
+ None => levels_to_read,
};
- let values_read = self.values_decoder.read(
- values,
- total_values_read..total_values_read + values_to_read,
- )?;
+ let values_read = self.values_decoder.read(values,
values_to_read)?;
if values_read != values_to_read {
return Err(general_err!(
@@ -302,9 +279,9 @@ where
));
}
- self.num_decoded_values += levels_read;
+ self.num_decoded_values += levels_to_read;
total_records_read += records_read;
- total_levels_read += levels_read;
+ total_levels_read += levels_to_read;
total_values_read += values_read;
}
@@ -389,9 +366,7 @@ where
}
let (values_read, def_levels_read) = match
self.def_level_decoder.as_mut() {
- Some(decoder) => {
- decoder.skip_def_levels(rep_levels_read,
self.descr.max_def_level())?
- }
+ Some(decoder) => decoder.skip_def_levels(rep_levels_read)?,
None => (rep_levels_read, rep_levels_read),
};
@@ -1016,34 +991,22 @@ mod tests {
#[test]
fn test_read_batch_values_only() {
- test_read_batch_int32(16, &mut [0; 10], None, None); // < batch_size
- test_read_batch_int32(16, &mut [0; 16], None, None); // == batch_size
- test_read_batch_int32(16, &mut [0; 51], None, None); // > batch_size
+ test_read_batch_int32(16, 0, 0);
}
#[test]
fn test_read_batch_values_def_levels() {
- test_read_batch_int32(16, &mut [0; 10], Some(&mut [0; 10]), None);
- test_read_batch_int32(16, &mut [0; 16], Some(&mut [0; 16]), None);
- test_read_batch_int32(16, &mut [0; 51], Some(&mut [0; 51]), None);
+ test_read_batch_int32(16, MAX_DEF_LEVEL, 0);
}
#[test]
fn test_read_batch_values_rep_levels() {
- test_read_batch_int32(16, &mut [0; 10], None, Some(&mut [0; 10]));
- test_read_batch_int32(16, &mut [0; 16], None, Some(&mut [0; 16]));
- test_read_batch_int32(16, &mut [0; 51], None, Some(&mut [0; 51]));
- }
-
- #[test]
- fn test_read_batch_different_buf_sizes() {
- test_read_batch_int32(16, &mut [0; 8], Some(&mut [0; 9]), Some(&mut
[0; 7]));
- test_read_batch_int32(16, &mut [0; 1], Some(&mut [0; 9]), Some(&mut
[0; 3]));
+ test_read_batch_int32(16, 0, MAX_REP_LEVEL);
}
#[test]
fn test_read_batch_values_def_rep_levels() {
- test_read_batch_int32(128, &mut [0; 128], Some(&mut [0; 128]),
Some(&mut [0; 128]));
+ test_read_batch_int32(128, MAX_DEF_LEVEL, MAX_REP_LEVEL);
}
#[test]
@@ -1065,9 +1028,6 @@ mod tests {
let num_pages = 2;
let num_levels = 4;
let batch_size = 5;
- let values = &mut vec![0; 7];
- let def_levels = &mut vec![0; 7];
- let rep_levels = &mut vec![0; 7];
let mut tester = ColumnReaderTester::<Int32Type>::new();
tester.test_read_batch(
@@ -1078,9 +1038,6 @@ mod tests {
batch_size,
std::i32::MIN,
std::i32::MAX,
- values,
- Some(def_levels),
- Some(rep_levels),
false,
);
}
@@ -1153,24 +1110,8 @@ mod tests {
//
// This is a high level wrapper on `ColumnReaderTester` that allows us to
specify some
// boilerplate code for setting up definition/repetition levels and column
descriptor.
- fn test_read_batch_int32(
- batch_size: usize,
- values: &mut [i32],
- def_levels: Option<&mut [i16]>,
- rep_levels: Option<&mut [i16]>,
- ) {
+ fn test_read_batch_int32(batch_size: usize, max_def_level: i16,
max_rep_level: i16) {
let primitive_type = get_test_int32_type();
- // make field is required based on provided slices of levels
- let max_def_level = if def_levels.is_some() {
- MAX_DEF_LEVEL
- } else {
- 0
- };
- let max_rep_level = if rep_levels.is_some() {
- MAX_REP_LEVEL
- } else {
- 0
- };
let desc = Arc::new(ColumnDescriptor::new(
Arc::new(primitive_type),
@@ -1178,6 +1119,7 @@ mod tests {
max_rep_level,
ColumnPath::new(Vec::new()),
));
+
let mut tester = ColumnReaderTester::<Int32Type>::new();
tester.test_read_batch(
desc,
@@ -1187,9 +1129,6 @@ mod tests {
batch_size,
i32::MIN,
i32::MAX,
- values,
- def_levels,
- rep_levels,
false,
);
}
@@ -1317,21 +1256,8 @@ mod tests {
max: T::T,
use_v2: bool,
) {
- let mut def_levels = vec![0; num_levels * num_pages];
- let mut rep_levels = vec![0; num_levels * num_pages];
- let mut values = vec![T::T::default(); num_levels * num_pages];
self.test_read_batch(
- desc,
- encoding,
- num_pages,
- num_levels,
- batch_size,
- min,
- max,
- &mut values,
- Some(&mut def_levels),
- Some(&mut rep_levels),
- use_v2,
+ desc, encoding, num_pages, num_levels, batch_size, min, max,
use_v2,
);
}
@@ -1347,9 +1273,6 @@ mod tests {
batch_size: usize,
min: T::T,
max: T::T,
- values: &mut [T::T],
- mut def_levels: Option<&mut [i16]>,
- mut rep_levels: Option<&mut [i16]>,
use_v2: bool,
) {
let mut pages = VecDeque::new();
@@ -1372,18 +1295,19 @@ mod tests {
let column_reader: ColumnReader = get_column_reader(desc,
Box::new(page_reader));
let mut typed_column_reader =
get_typed_column_reader::<T>(column_reader);
+ let mut values = Vec::new();
+ let mut def_levels = Vec::new();
+ let mut rep_levels = Vec::new();
+
let mut curr_values_read = 0;
let mut curr_levels_read = 0;
loop {
- let actual_def_levels = def_levels.as_mut().map(|vec| &mut
vec[curr_levels_read..]);
- let actual_rep_levels = rep_levels.as_mut().map(|vec| &mut
vec[curr_levels_read..]);
-
let (_, values_read, levels_read) = typed_column_reader
.read_records(
batch_size,
- actual_def_levels,
- actual_rep_levels,
- &mut values[curr_values_read..],
+ Some(&mut def_levels),
+ Some(&mut rep_levels),
+ &mut values,
)
.expect("read_batch() should be OK");
@@ -1395,38 +1319,18 @@ mod tests {
}
}
- assert!(
- values.len() >= curr_values_read,
- "values.len() >= values_read"
- );
- assert_eq!(
- &values[0..curr_values_read],
- &self.values[0..curr_values_read],
- "values content doesn't match"
- );
+ assert_eq!(values, self.values, "values content doesn't match");
if max_def_level > 0 {
- let levels = def_levels.as_ref().unwrap();
- assert!(
- levels.len() >= curr_levels_read,
- "def_levels.len() >= levels_read"
- );
assert_eq!(
- &levels[0..curr_levels_read],
- &self.def_levels[0..curr_levels_read],
+ def_levels, self.def_levels,
"definition levels content doesn't match"
);
}
if max_rep_level > 0 {
- let levels = rep_levels.as_ref().unwrap();
- assert!(
- levels.len() >= curr_levels_read,
- "rep_levels.len() >= levels_read"
- );
assert_eq!(
- &levels[0..curr_levels_read],
- &self.rep_levels[0..curr_levels_read],
+ rep_levels, self.rep_levels,
"repetition levels content doesn't match"
);
}
diff --git a/parquet/src/column/reader/decoder.rs
b/parquet/src/column/reader/decoder.rs
index ef62724689..9889973b67 100644
--- a/parquet/src/column/reader/decoder.rs
+++ b/parquet/src/column/reader/decoder.rs
@@ -16,7 +16,6 @@
// under the License.
use std::collections::HashMap;
-use std::ops::Range;
use bytes::Bytes;
@@ -30,52 +29,18 @@ use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use crate::util::bit_util::{num_required_bits, BitReader};
-/// A slice of levels buffer data that is written to by a
[`ColumnLevelDecoder`]
-pub trait LevelsBufferSlice {
- /// Returns the capacity of this slice or `usize::MAX` if no limit
- fn capacity(&self) -> usize;
-
- /// Count the number of levels in `range` not equal to `max_level`
- fn count_nulls(&self, range: Range<usize>, max_level: i16) -> usize;
-}
-
-impl LevelsBufferSlice for [i16] {
- fn capacity(&self) -> usize {
- self.len()
- }
-
- fn count_nulls(&self, range: Range<usize>, max_level: i16) -> usize {
- self[range].iter().filter(|i| **i != max_level).count()
- }
-}
-
-/// A slice of values buffer data that is written to by a
[`ColumnValueDecoder`]
-pub trait ValuesBufferSlice {
- /// Returns the capacity of this slice or `usize::MAX` if no limit
- fn capacity(&self) -> usize;
-}
-
-impl<T> ValuesBufferSlice for [T] {
- fn capacity(&self) -> usize {
- self.len()
- }
-}
-
-/// Decodes level data to a [`LevelsBufferSlice`]
+/// Decodes level data
pub trait ColumnLevelDecoder {
- type Slice: LevelsBufferSlice + ?Sized;
+ type Buffer;
/// Set data for this [`ColumnLevelDecoder`]
fn set_data(&mut self, encoding: Encoding, data: Bytes);
}
pub trait RepetitionLevelDecoder: ColumnLevelDecoder {
- /// Read up to `max_records` of repetition level data into `out[range]`
returning the number
+ /// Read up to `max_records` of repetition level data into `out` returning
the number
/// of complete records and levels read
///
- /// `range` is provided by the caller to allow for types such as
default-initialized `[T]`
- /// that only track capacity and not length
- ///
/// A record only ends when the data contains a subsequent repetition
level of 0,
/// it is therefore left to the caller to delimit the final record in a
column
///
@@ -84,9 +49,9 @@ pub trait RepetitionLevelDecoder: ColumnLevelDecoder {
/// Implementations may panic if `range` overlaps with already written data
fn read_rep_levels(
&mut self,
- out: &mut Self::Slice,
- range: Range<usize>,
- max_records: usize,
+ out: &mut Self::Buffer,
+ num_records: usize,
+ num_levels: usize,
) -> Result<(usize, usize)>;
/// Skips over up to `num_levels` repetition levels corresponding to
`num_records` records,
@@ -103,27 +68,28 @@ pub trait RepetitionLevelDecoder: ColumnLevelDecoder {
}
pub trait DefinitionLevelDecoder: ColumnLevelDecoder {
- /// Read definition level data into `out[range]` returning the number of
levels read
+ /// Read up to `num_levels` definition levels into `out`
///
- /// `range` is provided by the caller to allow for types such as
default-initialized `[T]`
- /// that only track capacity and not length
+ /// Returns the number of values skipped, and the number of levels skipped
///
/// # Panics
///
/// Implementations may panic if `range` overlaps with already written data
- ///
- // TODO: Should this return the number of nulls
- fn read_def_levels(&mut self, out: &mut Self::Slice, range: Range<usize>)
-> Result<usize>;
+ fn read_def_levels(
+ &mut self,
+ out: &mut Self::Buffer,
+ num_levels: usize,
+ ) -> Result<(usize, usize)>;
/// Skips over `num_levels` definition levels
///
/// Returns the number of values skipped, and the number of levels skipped
- fn skip_def_levels(&mut self, num_levels: usize, max_def_level: i16) ->
Result<(usize, usize)>;
+ fn skip_def_levels(&mut self, num_levels: usize) -> Result<(usize, usize)>;
}
-/// Decodes value data to a [`ValuesBufferSlice`]
+/// Decodes value data
pub trait ColumnValueDecoder {
- type Slice: ValuesBufferSlice + ?Sized;
+ type Buffer;
/// Create a new [`ColumnValueDecoder`]
fn new(col: &ColumnDescPtr) -> Self;
@@ -156,16 +122,13 @@ pub trait ColumnValueDecoder {
num_values: Option<usize>,
) -> Result<()>;
- /// Read values data into `out[range]` returning the number of values read
- ///
- /// `range` is provided by the caller to allow for types such as
default-initialized `[T]`
- /// that only track capacity and not length
+ /// Read up to `num_values` values into `out`
///
/// # Panics
///
/// Implementations may panic if `range` overlaps with already written data
///
- fn read(&mut self, out: &mut Self::Slice, range: Range<usize>) ->
Result<usize>;
+ fn read(&mut self, out: &mut Self::Buffer, num_values: usize) ->
Result<usize>;
/// Skips over `num_values` values
///
@@ -184,7 +147,7 @@ pub struct ColumnValueDecoderImpl<T: DataType> {
}
impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
- type Slice = [T::T];
+ type Buffer = Vec<T::T>;
fn new(descr: &ColumnDescPtr) -> Self {
Self {
@@ -258,7 +221,7 @@ impl<T: DataType> ColumnValueDecoder for
ColumnValueDecoderImpl<T> {
Ok(())
}
- fn read(&mut self, out: &mut Self::Slice, range: Range<usize>) ->
Result<usize> {
+ fn read(&mut self, out: &mut Self::Buffer, num_values: usize) ->
Result<usize> {
let encoding = self
.current_encoding
.expect("current_encoding should be set");
@@ -268,7 +231,12 @@ impl<T: DataType> ColumnValueDecoder for
ColumnValueDecoderImpl<T> {
.get_mut(&encoding)
.unwrap_or_else(|| panic!("decoder for encoding {encoding} should
be set"));
- current_decoder.get(&mut out[range])
+ // TODO: Push vec into decoder (#5177)
+ let start = out.len();
+ out.resize(start + num_values, T::T::default());
+ let read = current_decoder.get(&mut out[start..])?;
+ out.truncate(start + read);
+ Ok(read)
}
fn skip_values(&mut self, num_values: usize) -> Result<usize> {
@@ -319,6 +287,7 @@ impl LevelDecoder {
pub struct DefinitionLevelDecoderImpl {
decoder: Option<LevelDecoder>,
bit_width: u8,
+ max_level: i16,
}
impl DefinitionLevelDecoderImpl {
@@ -327,12 +296,13 @@ impl DefinitionLevelDecoderImpl {
Self {
decoder: None,
bit_width,
+ max_level,
}
}
}
impl ColumnLevelDecoder for DefinitionLevelDecoderImpl {
- type Slice = [i16];
+ type Buffer = Vec<i16>;
fn set_data(&mut self, encoding: Encoding, data: Bytes) {
self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width))
@@ -340,11 +310,23 @@ impl ColumnLevelDecoder for DefinitionLevelDecoderImpl {
}
impl DefinitionLevelDecoder for DefinitionLevelDecoderImpl {
- fn read_def_levels(&mut self, out: &mut Self::Slice, range: Range<usize>)
-> Result<usize> {
- self.decoder.as_mut().unwrap().read(&mut out[range])
+ fn read_def_levels(
+ &mut self,
+ out: &mut Self::Buffer,
+ num_levels: usize,
+ ) -> Result<(usize, usize)> {
+ // TODO: Push vec into decoder (#5177)
+ let start = out.len();
+ out.resize(start + num_levels, 0);
+ let levels_read = self.decoder.as_mut().unwrap().read(&mut
out[start..])?;
+ out.truncate(start + levels_read);
+
+ let iter = out.iter().skip(start);
+ let values_read = iter.filter(|x| **x == self.max_level).count();
+ Ok((values_read, levels_read))
}
- fn skip_def_levels(&mut self, num_levels: usize, max_def_level: i16) ->
Result<(usize, usize)> {
+ fn skip_def_levels(&mut self, num_levels: usize) -> Result<(usize, usize)>
{
let mut level_skip = 0;
let mut value_skip = 0;
let mut buf: Vec<i16> = vec![];
@@ -353,14 +335,14 @@ impl DefinitionLevelDecoder for
DefinitionLevelDecoderImpl {
let to_read = remaining_levels.min(SKIP_BUFFER_SIZE);
buf.resize(to_read, 0);
- let read = self.read_def_levels(&mut buf, 0..to_read)?;
- if read == 0 {
+ let (values_read, levels_read) = self.read_def_levels(&mut buf,
to_read)?;
+ if levels_read == 0 {
// Reached end of page
break;
}
- level_skip += read;
- value_skip += buf[..read].iter().filter(|x| **x ==
max_def_level).count();
+ level_skip += levels_read;
+ value_skip += values_read;
}
Ok((value_skip, level_skip))
@@ -423,7 +405,7 @@ impl RepetitionLevelDecoderImpl {
}
impl ColumnLevelDecoder for RepetitionLevelDecoderImpl {
- type Slice = [i16];
+ type Buffer = Vec<i16>;
fn set_data(&mut self, encoding: Encoding, data: Bytes) {
self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width));
@@ -435,16 +417,14 @@ impl ColumnLevelDecoder for RepetitionLevelDecoderImpl {
impl RepetitionLevelDecoder for RepetitionLevelDecoderImpl {
fn read_rep_levels(
&mut self,
- out: &mut Self::Slice,
- range: Range<usize>,
- max_records: usize,
+ out: &mut Self::Buffer,
+ num_records: usize,
+ num_levels: usize,
) -> Result<(usize, usize)> {
- let output = &mut out[range];
- let max_levels = output.len();
let mut total_records_read = 0;
let mut total_levels_read = 0;
- while total_records_read < max_records && total_levels_read <
max_levels {
+ while total_records_read < num_records && total_levels_read <
num_levels {
if self.buffer_len == self.buffer_offset {
self.fill_buf()?;
if self.buffer_len == 0 {
@@ -453,11 +433,11 @@ impl RepetitionLevelDecoder for
RepetitionLevelDecoderImpl {
}
let (partial, records_read, levels_read) = self.count_records(
- max_records - total_records_read,
- max_levels - total_levels_read,
+ num_records - total_records_read,
+ num_levels - total_levels_read,
);
- output[total_levels_read..total_levels_read +
levels_read].copy_from_slice(
+ out.extend_from_slice(
&self.buffer[self.buffer_offset..self.buffer_offset +
levels_read],
);
@@ -550,13 +530,13 @@ mod tests {
let (records_read, levels_read) = if skip {
decoder.skip_rep_levels(records, remaining_levels).unwrap()
} else {
- let mut decoded = vec![0; remaining_levels];
+ let mut decoded = Vec::new();
let (records_read, levels_read) = decoder
- .read_rep_levels(&mut decoded, 0..remaining_levels,
records)
+ .read_rep_levels(&mut decoded, records,
remaining_levels)
.unwrap();
assert_eq!(
- decoded[..levels_read],
+ decoded,
encoded[encoded.len() -
remaining_levels..][..levels_read]
);
(records_read, levels_read)
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 9f476595fb..597386b336 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -3292,19 +3292,20 @@ mod tests {
)
.unwrap(),
);
- let reader = get_test_column_reader::<T>(page_reader, max_def_level,
max_rep_level);
-
- let mut actual_values = vec![T::T::default(); max_batch_size];
- let mut actual_def_levels = def_levels.map(|_| vec![0i16;
max_batch_size]);
- let mut actual_rep_levels = rep_levels.map(|_| vec![0i16;
max_batch_size]);
-
- let (_, values_read, levels_read) = read_fully(
- reader,
- max_batch_size,
- actual_def_levels.as_mut(),
- actual_rep_levels.as_mut(),
- actual_values.as_mut_slice(),
- );
+ let mut reader = get_test_column_reader::<T>(page_reader,
max_def_level, max_rep_level);
+
+ let mut actual_values = Vec::with_capacity(max_batch_size);
+ let mut actual_def_levels = def_levels.map(|_|
Vec::with_capacity(max_batch_size));
+ let mut actual_rep_levels = rep_levels.map(|_|
Vec::with_capacity(max_batch_size));
+
+ let (_, values_read, levels_read) = reader
+ .read_records(
+ max_batch_size,
+ actual_def_levels.as_mut(),
+ actual_rep_levels.as_mut(),
+ &mut actual_values,
+ )
+ .unwrap();
// Assert values, definition and repetition levels.
@@ -3367,22 +3368,6 @@ mod tests {
assert_eq!(meta.encodings(), &encodings);
}
- /// Reads one batch of data, considering that batch is large enough to
capture all of
- /// the values and levels.
- fn read_fully<T: DataType>(
- mut reader: ColumnReaderImpl<T>,
- batch_size: usize,
- mut def_levels: Option<&mut Vec<i16>>,
- mut rep_levels: Option<&mut Vec<i16>>,
- values: &mut [T::T],
- ) -> (usize, usize, usize) {
- let actual_def_levels = def_levels.as_mut().map(|vec| &mut vec[..]);
- let actual_rep_levels = rep_levels.as_mut().map(|vec| &mut vec[..]);
- reader
- .read_records(batch_size, actual_def_levels, actual_rep_levels,
values)
- .unwrap()
- }
-
/// Returns column writer.
fn get_test_column_writer<'a, T: DataType>(
page_writer: Box<dyn PageWriter + 'a>,
diff --git a/parquet/src/file/serialized_reader.rs
b/parquet/src/file/serialized_reader.rs
index fbb172d3b3..468433f31d 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -1739,8 +1739,8 @@ mod tests {
let row_group_reader = reader.get_row_group(0).unwrap();
match row_group_reader.get_column_reader(0).unwrap() {
ColumnReader::Int64ColumnReader(mut reader) => {
- let mut buffer = [0; 1024];
- let mut def_levels = [0; 1024];
+ let mut buffer = Vec::with_capacity(1024);
+ let mut def_levels = Vec::with_capacity(1024);
let (num_records, num_values, num_levels) = reader
.read_records(1024, Some(&mut def_levels), None, &mut
buffer)
.unwrap();
@@ -1750,7 +1750,7 @@ mod tests {
assert_eq!(num_levels, 513);
let expected: Vec<i64> = (1..514).collect();
- assert_eq!(&buffer[..513], &expected);
+ assert_eq!(&buffer, &expected);
}
_ => unreachable!(),
}
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index f0b75f3025..3534bab816 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -1700,12 +1700,14 @@ mod tests {
let test_read = |reader: SerializedFileReader<Bytes>| {
let row_group = reader.get_row_group(0).unwrap();
- let mut out = [0; 4];
+ let mut out = Vec::with_capacity(4);
let c1 = row_group.get_column_reader(0).unwrap();
let mut c1 = get_typed_column_reader::<Int32Type>(c1);
c1.read_records(4, None, None, &mut out).unwrap();
assert_eq!(out, column_data[0]);
+ out.clear();
+
let c2 = row_group.get_column_reader(1).unwrap();
let mut c2 = get_typed_column_reader::<Int32Type>(c2);
c2.read_records(4, None, None, &mut out).unwrap();
diff --git a/parquet/src/record/triplet.rs b/parquet/src/record/triplet.rs
index 7647b23e28..902641c08b 100644
--- a/parquet/src/record/triplet.rs
+++ b/parquet/src/record/triplet.rs
@@ -296,17 +296,19 @@ impl<T: DataType> TypedTripletIter<T> {
// and therefore not advance `self.triplets_left`
while self.curr_triplet_index >= self.triplets_left {
let (records_read, values_read, levels_read) = {
- // Get slice of definition levels, if available
- let def_levels = self.def_levels.as_mut().map(|vec| &mut
vec[..]);
-
- // Get slice of repetition levels, if available
- let rep_levels = self.rep_levels.as_mut().map(|vec| &mut
vec[..]);
+ self.values.clear();
+ if let Some(x) = &mut self.def_levels {
+ x.clear()
+ }
+ if let Some(x) = &mut self.rep_levels {
+ x.clear()
+ }
// Buffer triplets
self.reader.read_records(
self.batch_size,
- def_levels,
- rep_levels,
+ self.def_levels.as_mut(),
+ self.rep_levels.as_mut(),
&mut self.values,
)?
};
@@ -333,6 +335,7 @@ impl<T: DataType> TypedTripletIter<T> {
// Note: if values_read == 0, then spacing will not be
triggered
let mut idx = values_read;
let def_levels = self.def_levels.as_ref().unwrap();
+ self.values.resize(levels_read, T::T::default());
for i in 0..levels_read {
if def_levels[levels_read - i - 1] == self.max_def_level {
idx -= 1; // This is done to avoid usize becoming a
negative value
diff --git a/parquet_derive/src/parquet_field.rs
b/parquet_derive/src/parquet_field.rs
index bb33b31968..8d759d11c4 100644
--- a/parquet_derive/src/parquet_field.rs
+++ b/parquet_derive/src/parquet_field.rs
@@ -243,15 +243,12 @@ impl Field {
pub fn reader_snippet(&self) -> proc_macro2::TokenStream {
let ident = &self.ident;
let column_reader = self.ty.column_reader();
- let parquet_type = self.ty.physical_type_as_rust();
// generate the code to read the column into a vector `vals`
let write_batch_expr = quote! {
- let mut vals_vec = Vec::new();
- vals_vec.resize(num_records, Default::default());
- let mut vals: &mut [#parquet_type] = vals_vec.as_mut_slice();
+ let mut vals = Vec::new();
if let #column_reader(mut typed) = column_reader {
- typed.read_records(num_records, None, None, vals)?;
+ typed.read_records(num_records, None, None, &mut vals)?;
} else {
panic!("Schema and struct disagree on type for {}",
stringify!{#ident});
}
@@ -646,23 +643,6 @@ impl Type {
}
}
- fn physical_type_as_rust(&self) -> proc_macro2::TokenStream {
- use parquet::basic::Type as BasicType;
-
- match self.physical_type() {
- BasicType::BOOLEAN => quote! { bool },
- BasicType::INT32 => quote! { i32 },
- BasicType::INT64 => quote! { i64 },
- BasicType::INT96 => unimplemented!("96-bit int currently is not
supported"),
- BasicType::FLOAT => quote! { f32 },
- BasicType::DOUBLE => quote! { f64 },
- BasicType::BYTE_ARRAY => quote! { ::parquet::data_type::ByteArray
},
- BasicType::FIXED_LEN_BYTE_ARRAY => {
- quote! { ::parquet::data_type::FixedLenByteArray }
- }
- }
- }
-
fn logical_type(&self) -> proc_macro2::TokenStream {
let last_part = self.last_part();
let leaf_type = self.leaf_type_recursive();
@@ -877,11 +857,9 @@ mod test {
snippet,
(quote! {
{
- let mut vals_vec = Vec::new();
- vals_vec.resize(num_records, Default::default());
- let mut vals: &mut[i64] = vals_vec.as_mut_slice();
+ let mut vals = Vec::new();
if let ColumnReader::Int64ColumnReader(mut typed) =
column_reader {
- typed.read_records(num_records, None, None, vals)?;
+ typed.read_records(num_records, None, None, &mut
vals)?;
} else {
panic!("Schema and struct disagree on type for {}",
stringify!{ counter });
}
@@ -1256,11 +1234,9 @@ mod test {
let when = Field::from(&fields[0]);
assert_eq!(when.reader_snippet().to_string(),(quote!{
{
- let mut vals_vec = Vec::new();
- vals_vec.resize(num_records, Default::default());
- let mut vals: &mut[i64] = vals_vec.as_mut_slice();
+ let mut vals = Vec::new();
if let ColumnReader::Int64ColumnReader(mut typed) =
column_reader {
- typed.read_records(num_records, None, None, vals)?;
+ typed.read_records(num_records, None, None, &mut vals)?;
} else {
panic!("Schema and struct disagree on type for {}",
stringify!{ henceforth });
}
@@ -1326,11 +1302,9 @@ mod test {
let when = Field::from(&fields[0]);
assert_eq!(when.reader_snippet().to_string(),(quote!{
{
- let mut vals_vec = Vec::new();
- vals_vec.resize(num_records, Default::default());
- let mut vals: &mut [i32] = vals_vec.as_mut_slice();
+ let mut vals = Vec::new();
if let ColumnReader::Int32ColumnReader(mut typed) =
column_reader {
- typed.read_records(num_records, None, None, vals)?;
+ typed.read_records(num_records, None, None, &mut vals)?;
} else {
panic!("Schema and struct disagree on type for {}",
stringify!{ henceforth });
}
@@ -1396,11 +1370,9 @@ mod test {
let when = Field::from(&fields[0]);
assert_eq!(when.reader_snippet().to_string(),(quote!{
{
- let mut vals_vec = Vec::new();
- vals_vec.resize(num_records, Default::default());
- let mut vals: &mut [::parquet::data_type::ByteArray] =
vals_vec.as_mut_slice();
+ let mut vals = Vec::new();
if let ColumnReader::ByteArrayColumnReader(mut typed) =
column_reader {
- typed.read_records(num_records, None, None, vals)?;
+ typed.read_records(num_records, None, None, &mut vals)?;
} else {
panic!("Schema and struct disagree on type for {}",
stringify!{ unique_id });
}