This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a1e8b572d Use Vec in ColumnReader (#5177) (#5193)
9a1e8b572d is described below

commit 9a1e8b572d11078e813fffe3d5c7106b6953d58c
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Fri Dec 15 18:25:40 2023 +0000

    Use Vec in ColumnReader (#5177) (#5193)
    
    * Use Vec in ColumnReader (#5177)
    
    * Update parquet_derive
---
 parquet/src/arrow/array_reader/byte_array.rs       |  34 ++---
 .../arrow/array_reader/byte_array_dictionary.rs    |  45 +++---
 .../src/arrow/array_reader/fixed_len_byte_array.rs |  81 ++++------
 parquet/src/arrow/array_reader/mod.rs              |   4 +-
 parquet/src/arrow/array_reader/null_array.rs       |  10 +-
 parquet/src/arrow/array_reader/primitive_array.rs  |   8 +-
 parquet/src/arrow/buffer/dictionary_buffer.rs      |  45 +-----
 parquet/src/arrow/buffer/offset_buffer.rs          |  28 +---
 parquet/src/arrow/record_reader/buffer.rs          |  67 +-------
 .../src/arrow/record_reader/definition_levels.rs   |  57 +++----
 parquet/src/arrow/record_reader/mod.rs             |  72 +++------
 parquet/src/column/reader.rs                       | 170 +++++----------------
 parquet/src/column/reader/decoder.rs               | 138 +++++++----------
 parquet/src/column/writer/mod.rs                   |  43 ++----
 parquet/src/file/serialized_reader.rs              |   6 +-
 parquet/src/file/writer.rs                         |   4 +-
 parquet/src/record/triplet.rs                      |  17 ++-
 parquet_derive/src/parquet_field.rs                |  48 ++----
 18 files changed, 266 insertions(+), 611 deletions(-)

diff --git a/parquet/src/arrow/array_reader/byte_array.rs 
b/parquet/src/arrow/array_reader/byte_array.rs
index debe0d6109..19086878c1 100644
--- a/parquet/src/arrow/array_reader/byte_array.rs
+++ b/parquet/src/arrow/array_reader/byte_array.rs
@@ -31,11 +31,10 @@ use crate::schema::types::ColumnDescPtr;
 use arrow_array::{
     Array, ArrayRef, BinaryArray, Decimal128Array, Decimal256Array, 
OffsetSizeTrait,
 };
-use arrow_buffer::{i256, Buffer};
+use arrow_buffer::i256;
 use arrow_schema::DataType as ArrowType;
 use bytes::Bytes;
 use std::any::Any;
-use std::ops::Range;
 use std::sync::Arc;
 
 /// Returns an [`ArrayReader`] that decodes the provided byte array column
@@ -79,8 +78,8 @@ pub fn make_byte_array_reader(
 struct ByteArrayReader<I: OffsetSizeTrait> {
     data_type: ArrowType,
     pages: Box<dyn PageIterator>,
-    def_levels_buffer: Option<Buffer>,
-    rep_levels_buffer: Option<Buffer>,
+    def_levels_buffer: Option<Vec<i16>>,
+    rep_levels_buffer: Option<Vec<i16>>,
     record_reader: GenericRecordReader<OffsetBuffer<I>, 
ByteArrayColumnValueDecoder<I>>,
 }
 
@@ -154,11 +153,11 @@ impl<I: OffsetSizeTrait> ArrayReader for 
ByteArrayReader<I> {
     }
 
     fn get_def_levels(&self) -> Option<&[i16]> {
-        self.def_levels_buffer.as_ref().map(|buf| buf.typed_data())
+        self.def_levels_buffer.as_deref()
     }
 
     fn get_rep_levels(&self) -> Option<&[i16]> {
-        self.rep_levels_buffer.as_ref().map(|buf| buf.typed_data())
+        self.rep_levels_buffer.as_deref()
     }
 }
 
@@ -170,7 +169,7 @@ struct ByteArrayColumnValueDecoder<I: OffsetSizeTrait> {
 }
 
 impl<I: OffsetSizeTrait> ColumnValueDecoder for ByteArrayColumnValueDecoder<I> 
{
-    type Slice = OffsetBuffer<I>;
+    type Buffer = OffsetBuffer<I>;
 
     fn new(desc: &ColumnDescPtr) -> Self {
         let validate_utf8 = desc.converted_type() == ConvertedType::UTF8;
@@ -227,13 +226,13 @@ impl<I: OffsetSizeTrait> ColumnValueDecoder for 
ByteArrayColumnValueDecoder<I> {
         Ok(())
     }
 
-    fn read(&mut self, out: &mut Self::Slice, range: Range<usize>) -> 
Result<usize> {
+    fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> 
Result<usize> {
         let decoder = self
             .decoder
             .as_mut()
             .ok_or_else(|| general_err!("no decoder set"))?;
 
-        decoder.read(out, range.end - range.start, self.dict.as_ref())
+        decoder.read(out, num_values, self.dict.as_ref())
     }
 
     fn skip_values(&mut self, num_values: usize) -> Result<usize> {
@@ -590,6 +589,7 @@ mod tests {
     use crate::arrow::array_reader::test_util::{byte_array_all_encodings, 
utf8_column};
     use crate::arrow::record_reader::buffer::ValuesBuffer;
     use arrow_array::{Array, StringArray};
+    use arrow_buffer::Buffer;
 
     #[test]
     fn test_byte_array_decoder() {
@@ -607,20 +607,20 @@ mod tests {
             let mut output = OffsetBuffer::<i32>::default();
             decoder.set_data(encoding, page, 4, Some(4)).unwrap();
 
-            assert_eq!(decoder.read(&mut output, 0..1).unwrap(), 1);
+            assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
 
             assert_eq!(output.values.as_slice(), "hello".as_bytes());
             assert_eq!(output.offsets.as_slice(), &[0, 5]);
 
-            assert_eq!(decoder.read(&mut output, 1..2).unwrap(), 1);
+            assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
             assert_eq!(output.values.as_slice(), "helloworld".as_bytes());
             assert_eq!(output.offsets.as_slice(), &[0, 5, 10]);
 
-            assert_eq!(decoder.read(&mut output, 2..4).unwrap(), 2);
+            assert_eq!(decoder.read(&mut output, 2).unwrap(), 2);
             assert_eq!(output.values.as_slice(), "helloworldab".as_bytes());
             assert_eq!(output.offsets.as_slice(), &[0, 5, 10, 11, 12]);
 
-            assert_eq!(decoder.read(&mut output, 4..8).unwrap(), 0);
+            assert_eq!(decoder.read(&mut output, 4).unwrap(), 0);
 
             let valid = [false, false, true, true, false, true, true, false, 
false];
             let valid_buffer = Buffer::from_iter(valid.iter().cloned());
@@ -662,7 +662,7 @@ mod tests {
             let mut output = OffsetBuffer::<i32>::default();
             decoder.set_data(encoding, page, 4, Some(4)).unwrap();
 
-            assert_eq!(decoder.read(&mut output, 0..1).unwrap(), 1);
+            assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
 
             assert_eq!(output.values.as_slice(), "hello".as_bytes());
             assert_eq!(output.offsets.as_slice(), &[0, 5]);
@@ -670,11 +670,11 @@ mod tests {
             assert_eq!(decoder.skip_values(1).unwrap(), 1);
             assert_eq!(decoder.skip_values(1).unwrap(), 1);
 
-            assert_eq!(decoder.read(&mut output, 1..2).unwrap(), 1);
+            assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
             assert_eq!(output.values.as_slice(), "hellob".as_bytes());
             assert_eq!(output.offsets.as_slice(), &[0, 5, 6]);
 
-            assert_eq!(decoder.read(&mut output, 4..8).unwrap(), 0);
+            assert_eq!(decoder.read(&mut output, 4).unwrap(), 0);
 
             let valid = [false, false, true, true, false, false];
             let valid_buffer = Buffer::from_iter(valid.iter().cloned());
@@ -705,7 +705,7 @@ mod tests {
         for (encoding, page) in pages.clone() {
             let mut output = OffsetBuffer::<i32>::default();
             decoder.set_data(encoding, page, 4, None).unwrap();
-            assert_eq!(decoder.read(&mut output, 0..1024).unwrap(), 0);
+            assert_eq!(decoder.read(&mut output, 1024).unwrap(), 0);
         }
 
         // test nulls skip
diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs 
b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
index a381223541..3678f24621 100644
--- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs
+++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
@@ -17,18 +17,16 @@
 
 use std::any::Any;
 use std::marker::PhantomData;
-use std::ops::Range;
 use std::sync::Arc;
 
 use arrow_array::{Array, ArrayRef, OffsetSizeTrait};
-use arrow_buffer::{ArrowNativeType, Buffer};
+use arrow_buffer::ArrowNativeType;
 use arrow_schema::DataType as ArrowType;
 use bytes::Bytes;
 
 use crate::arrow::array_reader::byte_array::{ByteArrayDecoder, 
ByteArrayDecoderPlain};
 use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
 use crate::arrow::buffer::{dictionary_buffer::DictionaryBuffer, 
offset_buffer::OffsetBuffer};
-use crate::arrow::record_reader::buffer::BufferQueue;
 use crate::arrow::record_reader::GenericRecordReader;
 use crate::arrow::schema::parquet_to_arrow_field;
 use crate::basic::{ConvertedType, Encoding};
@@ -124,8 +122,8 @@ pub fn make_byte_array_dictionary_reader(
 struct ByteArrayDictionaryReader<K: ArrowNativeType, V: OffsetSizeTrait> {
     data_type: ArrowType,
     pages: Box<dyn PageIterator>,
-    def_levels_buffer: Option<Buffer>,
-    rep_levels_buffer: Option<Buffer>,
+    def_levels_buffer: Option<Vec<i16>>,
+    rep_levels_buffer: Option<Vec<i16>>,
     record_reader: GenericRecordReader<DictionaryBuffer<K, V>, 
DictionaryDecoder<K, V>>,
 }
 
@@ -183,11 +181,11 @@ where
     }
 
     fn get_def_levels(&self) -> Option<&[i16]> {
-        self.def_levels_buffer.as_ref().map(|buf| buf.typed_data())
+        self.def_levels_buffer.as_deref()
     }
 
     fn get_rep_levels(&self) -> Option<&[i16]> {
-        self.rep_levels_buffer.as_ref().map(|buf| buf.typed_data())
+        self.rep_levels_buffer.as_deref()
     }
 }
 
@@ -224,7 +222,7 @@ where
     K: FromBytes + Ord + ArrowNativeType,
     V: OffsetSizeTrait,
 {
-    type Slice = DictionaryBuffer<K, V>;
+    type Buffer = DictionaryBuffer<K, V>;
 
     fn new(col: &ColumnDescPtr) -> Self {
         let validate_utf8 = col.converted_type() == ConvertedType::UTF8;
@@ -306,16 +304,16 @@ where
         Ok(())
     }
 
-    fn read(&mut self, out: &mut Self::Slice, range: Range<usize>) -> 
Result<usize> {
+    fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> 
Result<usize> {
         match self.decoder.as_mut().expect("decoder set") {
             MaybeDictionaryDecoder::Fallback(decoder) => {
-                decoder.read(out.spill_values()?, range.end - range.start, 
None)
+                decoder.read(out.spill_values()?, num_values, None)
             }
             MaybeDictionaryDecoder::Dict {
                 decoder,
                 max_remaining_values,
             } => {
-                let len = (range.end - range.start).min(*max_remaining_values);
+                let len = num_values.min(*max_remaining_values);
 
                 let dict = self
                     .dict
@@ -332,8 +330,12 @@ where
                     Some(keys) => {
                         // Happy path - can just copy keys
                         // Keys will be validated on conversion to arrow
-                        let keys_slice = keys.get_output_slice(len);
-                        let len = decoder.get_batch(keys_slice)?;
+
+                        // TODO: Push vec into decoder (#5177)
+                        let start = keys.len();
+                        keys.resize(start + len, K::default());
+                        let len = decoder.get_batch(&mut keys[start..])?;
+                        keys.truncate(start + len);
                         *max_remaining_values -= len;
                         Ok(len)
                     }
@@ -381,6 +383,7 @@ where
 mod tests {
     use arrow::compute::cast;
     use arrow_array::{Array, StringArray};
+    use arrow_buffer::Buffer;
 
     use crate::arrow::array_reader::test_util::{
         byte_array_all_encodings, encode_dictionary, utf8_column,
@@ -416,7 +419,7 @@ mod tests {
             .unwrap();
 
         let mut output = DictionaryBuffer::<i32, i32>::default();
-        assert_eq!(decoder.read(&mut output, 0..3).unwrap(), 3);
+        assert_eq!(decoder.read(&mut output, 3).unwrap(), 3);
 
         let mut valid = vec![false, false, true, true, false, true];
         let valid_buffer = Buffer::from_iter(valid.iter().cloned());
@@ -424,7 +427,7 @@ mod tests {
 
         assert!(matches!(output, DictionaryBuffer::Dict { .. }));
 
-        assert_eq!(decoder.read(&mut output, 0..4).unwrap(), 4);
+        assert_eq!(decoder.read(&mut output, 4).unwrap(), 4);
 
         valid.extend_from_slice(&[false, false, true, true, false, true, true, 
false]);
         let valid_buffer = Buffer::from_iter(valid.iter().cloned());
@@ -484,17 +487,17 @@ mod tests {
         let mut output = DictionaryBuffer::<i32, i32>::default();
 
         // read two skip one
-        assert_eq!(decoder.read(&mut output, 0..2).unwrap(), 2);
+        assert_eq!(decoder.read(&mut output, 2).unwrap(), 2);
         assert_eq!(decoder.skip_values(1).unwrap(), 1);
 
         assert!(matches!(output, DictionaryBuffer::Dict { .. }));
 
         // read two skip one
-        assert_eq!(decoder.read(&mut output, 2..4).unwrap(), 2);
+        assert_eq!(decoder.read(&mut output, 2).unwrap(), 2);
         assert_eq!(decoder.skip_values(1).unwrap(), 1);
 
         // read one and test on skip at the end
-        assert_eq!(decoder.read(&mut output, 4..5).unwrap(), 1);
+        assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
         assert_eq!(decoder.skip_values(4).unwrap(), 0);
 
         let valid = [true, true, true, true, true];
@@ -536,7 +539,7 @@ mod tests {
 
         for (encoding, page) in pages {
             decoder.set_data(encoding, page, 4, Some(4)).unwrap();
-            assert_eq!(decoder.read(&mut output, 0..1024).unwrap(), 4);
+            assert_eq!(decoder.read(&mut output, 1024).unwrap(), 4);
         }
         let array = output.into_array(None, &data_type).unwrap();
         assert_eq!(array.data_type(), &data_type);
@@ -580,7 +583,7 @@ mod tests {
         for (encoding, page) in pages {
             decoder.set_data(encoding, page, 4, Some(4)).unwrap();
             decoder.skip_values(2).expect("skipping two values");
-            assert_eq!(decoder.read(&mut output, 0..1024).unwrap(), 2);
+            assert_eq!(decoder.read(&mut output, 1024).unwrap(), 2);
         }
         let array = output.into_array(None, &data_type).unwrap();
         assert_eq!(array.data_type(), &data_type);
@@ -641,7 +644,7 @@ mod tests {
         for (encoding, page) in pages.clone() {
             let mut output = DictionaryBuffer::<i32, i32>::default();
             decoder.set_data(encoding, page, 8, None).unwrap();
-            assert_eq!(decoder.read(&mut output, 0..1024).unwrap(), 0);
+            assert_eq!(decoder.read(&mut output, 1024).unwrap(), 0);
 
             output.pad_nulls(0, 0, 8, &[0]);
             let array = output
diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs 
b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
index 849aa37c56..a0d25d403c 100644
--- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
+++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
@@ -18,12 +18,12 @@
 use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
 use crate::arrow::buffer::bit_util::{iter_set_bits_rev, sign_extend_be};
 use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder};
-use crate::arrow::record_reader::buffer::{BufferQueue, ValuesBuffer};
+use crate::arrow::record_reader::buffer::ValuesBuffer;
 use crate::arrow::record_reader::GenericRecordReader;
 use crate::arrow::schema::parquet_to_arrow_field;
 use crate::basic::{Encoding, Type};
 use crate::column::page::PageIterator;
-use crate::column::reader::decoder::{ColumnValueDecoder, ValuesBufferSlice};
+use crate::column::reader::decoder::ColumnValueDecoder;
 use crate::errors::{ParquetError, Result};
 use crate::schema::types::ColumnDescPtr;
 use arrow_array::{
@@ -36,7 +36,6 @@ use arrow_schema::{DataType as ArrowType, IntervalUnit};
 use bytes::Bytes;
 use half::f16;
 use std::any::Any;
-use std::ops::Range;
 use std::sync::Arc;
 
 /// Returns an [`ArrayReader`] that decodes the provided fixed length byte 
array column
@@ -117,8 +116,8 @@ struct FixedLenByteArrayReader {
     data_type: ArrowType,
     byte_length: usize,
     pages: Box<dyn PageIterator>,
-    def_levels_buffer: Option<Buffer>,
-    rep_levels_buffer: Option<Buffer>,
+    def_levels_buffer: Option<Vec<i16>>,
+    rep_levels_buffer: Option<Vec<i16>>,
     record_reader: GenericRecordReader<FixedLenByteArrayBuffer, ValueDecoder>,
 }
 
@@ -135,13 +134,7 @@ impl FixedLenByteArrayReader {
             pages,
             def_levels_buffer: None,
             rep_levels_buffer: None,
-            record_reader: GenericRecordReader::new_with_records(
-                column_desc,
-                FixedLenByteArrayBuffer {
-                    buffer: Default::default(),
-                    byte_length,
-                },
-            ),
+            record_reader: GenericRecordReader::new(column_desc),
         }
     }
 }
@@ -164,7 +157,7 @@ impl ArrayReader for FixedLenByteArrayReader {
 
         let array_data = 
ArrayDataBuilder::new(ArrowType::FixedSizeBinary(self.byte_length as i32))
             .len(self.record_reader.num_values())
-            .add_buffer(record_data)
+            .add_buffer(Buffer::from_vec(record_data.buffer))
             .null_bit_buffer(self.record_reader.consume_bitmap_buffer());
 
         let binary = FixedSizeBinaryArray::from(unsafe { 
array_data.build_unchecked() });
@@ -231,41 +224,19 @@ impl ArrayReader for FixedLenByteArrayReader {
     }
 
     fn get_def_levels(&self) -> Option<&[i16]> {
-        self.def_levels_buffer.as_ref().map(|buf| buf.typed_data())
+        self.def_levels_buffer.as_deref()
     }
 
     fn get_rep_levels(&self) -> Option<&[i16]> {
-        self.rep_levels_buffer.as_ref().map(|buf| buf.typed_data())
+        self.rep_levels_buffer.as_deref()
     }
 }
 
+#[derive(Default)]
 struct FixedLenByteArrayBuffer {
     buffer: Vec<u8>,
     /// The length of each element in bytes
-    byte_length: usize,
-}
-
-impl ValuesBufferSlice for FixedLenByteArrayBuffer {
-    fn capacity(&self) -> usize {
-        usize::MAX
-    }
-}
-
-impl BufferQueue for FixedLenByteArrayBuffer {
-    type Output = Buffer;
-    type Slice = Self;
-
-    fn consume(&mut self) -> Self::Output {
-        Buffer::from_vec(self.buffer.consume())
-    }
-
-    fn get_output_slice(&mut self, _batch_size: usize) -> &mut Self::Slice {
-        self
-    }
-
-    fn truncate_buffer(&mut self, len: usize) {
-        assert_eq!(self.buffer.len(), len * self.byte_length);
-    }
+    byte_length: Option<usize>,
 }
 
 impl ValuesBuffer for FixedLenByteArrayBuffer {
@@ -276,12 +247,11 @@ impl ValuesBuffer for FixedLenByteArrayBuffer {
         levels_read: usize,
         valid_mask: &[u8],
     ) {
-        assert_eq!(
-            self.buffer.len(),
-            (read_offset + values_read) * self.byte_length
-        );
+        let byte_length = self.byte_length.unwrap_or_default();
+
+        assert_eq!(self.buffer.len(), (read_offset + values_read) * 
byte_length);
         self.buffer
-            .resize((read_offset + levels_read) * self.byte_length, 0);
+            .resize((read_offset + levels_read) * byte_length, 0);
 
         let values_range = read_offset..read_offset + values_read;
         for (value_pos, level_pos) in 
values_range.rev().zip(iter_set_bits_rev(valid_mask)) {
@@ -290,10 +260,10 @@ impl ValuesBuffer for FixedLenByteArrayBuffer {
                 break;
             }
 
-            let level_pos_bytes = level_pos * self.byte_length;
-            let value_pos_bytes = value_pos * self.byte_length;
+            let level_pos_bytes = level_pos * byte_length;
+            let value_pos_bytes = value_pos * byte_length;
 
-            for i in 0..self.byte_length {
+            for i in 0..byte_length {
                 self.buffer[level_pos_bytes + i] = self.buffer[value_pos_bytes 
+ i]
             }
         }
@@ -307,7 +277,7 @@ struct ValueDecoder {
 }
 
 impl ColumnValueDecoder for ValueDecoder {
-    type Slice = FixedLenByteArrayBuffer;
+    type Buffer = FixedLenByteArrayBuffer;
 
     fn new(col: &ColumnDescPtr) -> Self {
         Self {
@@ -374,13 +344,16 @@ impl ColumnValueDecoder for ValueDecoder {
         Ok(())
     }
 
-    fn read(&mut self, out: &mut Self::Slice, range: Range<usize>) -> 
Result<usize> {
-        assert_eq!(self.byte_length, out.byte_length);
+    fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> 
Result<usize> {
+        match out.byte_length {
+            Some(x) => assert_eq!(x, self.byte_length),
+            None => out.byte_length = Some(self.byte_length),
+        }
 
-        let len = range.end - range.start;
         match self.decoder.as_mut().unwrap() {
             Decoder::Plain { offset, buf } => {
-                let to_read = (len * self.byte_length).min(buf.len() - 
*offset) / self.byte_length;
+                let to_read =
+                    (num_values * self.byte_length).min(buf.len() - *offset) / 
self.byte_length;
                 let end_offset = *offset + to_read * self.byte_length;
                 out.buffer
                     .extend_from_slice(&buf.as_ref()[*offset..end_offset]);
@@ -394,7 +367,7 @@ impl ColumnValueDecoder for ValueDecoder {
                     return Ok(0);
                 }
 
-                decoder.read(len, |keys| {
+                decoder.read(num_values, |keys| {
                     out.buffer.reserve(keys.len() * self.byte_length);
                     for key in keys {
                         let offset = *key as usize * self.byte_length;
@@ -405,7 +378,7 @@ impl ColumnValueDecoder for ValueDecoder {
                 })
             }
             Decoder::Delta { decoder } => {
-                let to_read = len.min(decoder.remaining());
+                let to_read = num_values.min(decoder.remaining());
                 out.buffer.reserve(to_read * self.byte_length);
 
                 decoder.read(to_read, |slice| {
diff --git a/parquet/src/arrow/array_reader/mod.rs 
b/parquet/src/arrow/array_reader/mod.rs
index a4ee504059..c4e9fc5fa0 100644
--- a/parquet/src/arrow/array_reader/mod.rs
+++ b/parquet/src/arrow/array_reader/mod.rs
@@ -129,7 +129,7 @@ fn read_records<V, CV>(
 ) -> Result<usize>
 where
     V: ValuesBuffer,
-    CV: ColumnValueDecoder<Slice = V::Slice>,
+    CV: ColumnValueDecoder<Buffer = V>,
 {
     let mut records_read = 0usize;
     while records_read < batch_size {
@@ -163,7 +163,7 @@ fn skip_records<V, CV>(
 ) -> Result<usize>
 where
     V: ValuesBuffer,
-    CV: ColumnValueDecoder<Slice = V::Slice>,
+    CV: ColumnValueDecoder<Buffer = V>,
 {
     let mut records_skipped = 0usize;
     while records_skipped < batch_size {
diff --git a/parquet/src/arrow/array_reader/null_array.rs 
b/parquet/src/arrow/array_reader/null_array.rs
index bb32fb307f..838db854e0 100644
--- a/parquet/src/arrow/array_reader/null_array.rs
+++ b/parquet/src/arrow/array_reader/null_array.rs
@@ -22,7 +22,7 @@ use crate::data_type::DataType;
 use crate::errors::Result;
 use crate::schema::types::ColumnDescPtr;
 use arrow_array::ArrayRef;
-use arrow_buffer::{ArrowNativeType, Buffer};
+use arrow_buffer::ArrowNativeType;
 use arrow_schema::DataType as ArrowType;
 use std::any::Any;
 use std::sync::Arc;
@@ -36,8 +36,8 @@ where
 {
     data_type: ArrowType,
     pages: Box<dyn PageIterator>,
-    def_levels_buffer: Option<Buffer>,
-    rep_levels_buffer: Option<Buffer>,
+    def_levels_buffer: Option<Vec<i16>>,
+    rep_levels_buffer: Option<Vec<i16>>,
     record_reader: RecordReader<T>,
 }
 
@@ -99,10 +99,10 @@ where
     }
 
     fn get_def_levels(&self) -> Option<&[i16]> {
-        self.def_levels_buffer.as_ref().map(|buf| buf.typed_data())
+        self.def_levels_buffer.as_deref()
     }
 
     fn get_rep_levels(&self) -> Option<&[i16]> {
-        self.rep_levels_buffer.as_ref().map(|buf| buf.typed_data())
+        self.rep_levels_buffer.as_deref()
     }
 }
diff --git a/parquet/src/arrow/array_reader/primitive_array.rs 
b/parquet/src/arrow/array_reader/primitive_array.rs
index 507b6215ca..07ecc27d9b 100644
--- a/parquet/src/arrow/array_reader/primitive_array.rs
+++ b/parquet/src/arrow/array_reader/primitive_array.rs
@@ -77,8 +77,8 @@ where
 {
     data_type: ArrowType,
     pages: Box<dyn PageIterator>,
-    def_levels_buffer: Option<Buffer>,
-    rep_levels_buffer: Option<Buffer>,
+    def_levels_buffer: Option<Vec<i16>>,
+    rep_levels_buffer: Option<Vec<i16>>,
     record_reader: RecordReader<T>,
 }
 
@@ -287,11 +287,11 @@ where
     }
 
     fn get_def_levels(&self) -> Option<&[i16]> {
-        self.def_levels_buffer.as_ref().map(|buf| buf.typed_data())
+        self.def_levels_buffer.as_deref()
     }
 
     fn get_rep_levels(&self) -> Option<&[i16]> {
-        self.rep_levels_buffer.as_ref().map(|buf| buf.typed_data())
+        self.rep_levels_buffer.as_deref()
     }
 }
 
diff --git a/parquet/src/arrow/buffer/dictionary_buffer.rs 
b/parquet/src/arrow/buffer/dictionary_buffer.rs
index d0f63024ed..9e5b2293aa 100644
--- a/parquet/src/arrow/buffer/dictionary_buffer.rs
+++ b/parquet/src/arrow/buffer/dictionary_buffer.rs
@@ -16,8 +16,7 @@
 // under the License.
 
 use crate::arrow::buffer::offset_buffer::OffsetBuffer;
-use crate::arrow::record_reader::buffer::{BufferQueue, ValuesBuffer};
-use crate::column::reader::decoder::ValuesBufferSlice;
+use crate::arrow::record_reader::buffer::ValuesBuffer;
 use crate::errors::{ParquetError, Result};
 use arrow_array::{make_array, Array, ArrayRef, OffsetSizeTrait};
 use arrow_buffer::{ArrowNativeType, Buffer};
@@ -185,12 +184,6 @@ impl<K: ArrowNativeType + Ord, V: OffsetSizeTrait> 
DictionaryBuffer<K, V> {
     }
 }
 
-impl<K: ArrowNativeType, V: OffsetSizeTrait> ValuesBufferSlice for 
DictionaryBuffer<K, V> {
-    fn capacity(&self) -> usize {
-        usize::MAX
-    }
-}
-
 impl<K: ArrowNativeType, V: OffsetSizeTrait> ValuesBuffer for 
DictionaryBuffer<K, V> {
     fn pad_nulls(
         &mut self,
@@ -211,34 +204,6 @@ impl<K: ArrowNativeType, V: OffsetSizeTrait> ValuesBuffer 
for DictionaryBuffer<K
     }
 }
 
-impl<K: ArrowNativeType, V: OffsetSizeTrait> BufferQueue for 
DictionaryBuffer<K, V> {
-    type Output = Self;
-    type Slice = Self;
-
-    fn consume(&mut self) -> Self::Output {
-        match self {
-            Self::Dict { keys, values } => Self::Dict {
-                keys: std::mem::take(keys),
-                values: values.clone(),
-            },
-            Self::Values { values } => Self::Values {
-                values: values.consume(),
-            },
-        }
-    }
-
-    fn get_output_slice(&mut self, _batch_size: usize) -> &mut Self::Slice {
-        self
-    }
-
-    fn truncate_buffer(&mut self, len: usize) {
-        match self {
-            Self::Dict { keys, .. } => keys.truncate_buffer(len),
-            Self::Values { values } => values.truncate_buffer(len),
-        }
-    }
-}
-
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -274,7 +239,7 @@ mod tests {
         buffer.pad_nulls(read_offset, 2, 5, null_buffer.as_slice());
 
         assert_eq!(buffer.len(), 13);
-        let split = buffer.consume();
+        let split = std::mem::take(&mut buffer);
 
         let array = split.into_array(Some(null_buffer), &dict_type).unwrap();
         assert_eq!(array.data_type(), &dict_type);
@@ -309,7 +274,9 @@ mod tests {
             .unwrap()
             .extend_from_slice(&[0, 1, 0, 1]);
 
-        let array = buffer.consume().into_array(None, &dict_type).unwrap();
+        let array = std::mem::take(&mut buffer)
+            .into_array(None, &dict_type)
+            .unwrap();
         assert_eq!(array.data_type(), &dict_type);
 
         let strings = cast(&array, &ArrowType::Utf8).unwrap();
@@ -320,7 +287,7 @@ mod tests {
         );
 
         // Can recreate with new dictionary as keys empty
-        assert!(matches!(&buffer, DictionaryBuffer::Dict { .. }));
+        assert!(matches!(&buffer, DictionaryBuffer::Values { .. }));
         assert_eq!(buffer.len(), 0);
         let d3 = Arc::new(StringArray::from(vec!["bongo"])) as ArrayRef;
         buffer.as_keys(&d3).unwrap().extend_from_slice(&[0, 0]);
diff --git a/parquet/src/arrow/buffer/offset_buffer.rs 
b/parquet/src/arrow/buffer/offset_buffer.rs
index 459c94ed28..ce9eb1142a 100644
--- a/parquet/src/arrow/buffer/offset_buffer.rs
+++ b/parquet/src/arrow/buffer/offset_buffer.rs
@@ -16,8 +16,7 @@
 // under the License.
 
 use crate::arrow::buffer::bit_util::iter_set_bits_rev;
-use crate::arrow::record_reader::buffer::{BufferQueue, ValuesBuffer};
-use crate::column::reader::decoder::ValuesBufferSlice;
+use crate::arrow::record_reader::buffer::ValuesBuffer;
 use crate::errors::{ParquetError, Result};
 use arrow_array::{make_array, ArrayRef, OffsetSizeTrait};
 use arrow_buffer::{ArrowNativeType, Buffer};
@@ -141,23 +140,6 @@ impl<I: OffsetSizeTrait> OffsetBuffer<I> {
     }
 }
 
-impl<I: OffsetSizeTrait> BufferQueue for OffsetBuffer<I> {
-    type Output = Self;
-    type Slice = Self;
-
-    fn consume(&mut self) -> Self::Output {
-        std::mem::take(self)
-    }
-
-    fn get_output_slice(&mut self, _batch_size: usize) -> &mut Self::Slice {
-        self
-    }
-
-    fn truncate_buffer(&mut self, len: usize) {
-        assert_eq!(self.offsets.len(), len + 1);
-    }
-}
-
 impl<I: OffsetSizeTrait> ValuesBuffer for OffsetBuffer<I> {
     fn pad_nulls(
         &mut self,
@@ -208,12 +190,6 @@ impl<I: OffsetSizeTrait> ValuesBuffer for OffsetBuffer<I> {
     }
 }
 
-impl<I: OffsetSizeTrait> ValuesBufferSlice for OffsetBuffer<I> {
-    fn capacity(&self) -> usize {
-        usize::MAX
-    }
-}
-
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -250,7 +226,7 @@ mod tests {
         for v in ["hello", "world", "cupcakes", "a", "b", "c"] {
             buffer.try_push(v.as_bytes(), false).unwrap()
         }
-        let split = buffer.consume();
+        let split = std::mem::take(&mut buffer);
 
         let array = split.into_array(None, ArrowType::Utf8);
         let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
diff --git a/parquet/src/arrow/record_reader/buffer.rs 
b/parquet/src/arrow/record_reader/buffer.rs
index 3914710ff7..880407a547 100644
--- a/parquet/src/arrow/record_reader/buffer.rs
+++ b/parquet/src/arrow/record_reader/buffer.rs
@@ -17,69 +17,8 @@
 
 use crate::arrow::buffer::bit_util::iter_set_bits_rev;
 
-/// A buffer that supports writing new data to the end, and removing data from 
the front
-///
-/// Used by [RecordReader](`super::RecordReader`) to buffer up values before 
returning a
-/// potentially smaller number of values, corresponding to a whole number of 
semantic records
-pub trait BufferQueue: Sized {
-    type Output: Sized;
-
-    type Slice: ?Sized;
-
-    /// Consumes the contents of this [`BufferQueue`]
-    fn consume(&mut self) -> Self::Output;
-
-    /// Returns a [`Self::Slice`] with at least `batch_size` capacity that can 
be used
-    /// to append data to the end of this [`BufferQueue`]
-    ///
-    /// NB: writes to the returned slice will not update the length of 
[`BufferQueue`]
-    /// instead a subsequent call should be made to 
[`BufferQueue::truncate_buffer`]
-    fn get_output_slice(&mut self, batch_size: usize) -> &mut Self::Slice;
-
-    /// Sets the length of the [`BufferQueue`].
-    ///
-    /// Intended to be used in combination with 
[`BufferQueue::get_output_slice`]
-    ///
-    /// # Panics
-    ///
-    /// Implementations must panic if `len` is beyond the initialized length
-    ///
-    /// Implementations may panic if `set_len` is called with less than what 
has been written
-    ///
-    /// This distinction is to allow for implementations that return a default 
initialized
-    /// [BufferQueue::Slice`] which doesn't track capacity and length 
separately
-    ///
-    /// For example, [`BufferQueue`] returns a default-initialized `&mut [T]`, 
and does not
-    /// track how much of this slice is actually written to by the caller. 
This is still
-    /// safe as the slice is default-initialized.
-    ///
-    fn truncate_buffer(&mut self, len: usize);
-}
-
-impl<T: Copy + Default> BufferQueue for Vec<T> {
-    type Output = Self;
-
-    type Slice = [T];
-
-    fn consume(&mut self) -> Self::Output {
-        std::mem::take(self)
-    }
-
-    fn get_output_slice(&mut self, batch_size: usize) -> &mut Self::Slice {
-        let len = self.len();
-        self.resize(len + batch_size, T::default());
-        &mut self[len..]
-    }
-
-    fn truncate_buffer(&mut self, len: usize) {
-        assert!(len <= self.len());
-        self.truncate(len)
-    }
-}
-
-/// A [`BufferQueue`] capable of storing column values
-pub trait ValuesBuffer: BufferQueue {
-    ///
+/// A buffer that supports padding with nulls
+pub trait ValuesBuffer: Default {
     /// If a column contains nulls, more level data may be read than value 
data, as null
     /// values are not encoded. Therefore, first the levels data is read, the 
null count
     /// determined, and then the corresponding number of values read to a 
[`ValuesBuffer`].
@@ -111,7 +50,7 @@ impl<T: Copy + Default> ValuesBuffer for Vec<T> {
         levels_read: usize,
         valid_mask: &[u8],
     ) {
-        assert!(self.len() >= read_offset + levels_read);
+        self.resize(read_offset + levels_read, T::default());
 
         let values_range = read_offset..read_offset + values_read;
         for (value_pos, level_pos) in 
values_range.rev().zip(iter_set_bits_rev(valid_mask)) {
diff --git a/parquet/src/arrow/record_reader/definition_levels.rs 
b/parquet/src/arrow/record_reader/definition_levels.rs
index fa041f5fdb..87f531df09 100644
--- a/parquet/src/arrow/record_reader/definition_levels.rs
+++ b/parquet/src/arrow/record_reader/definition_levels.rs
@@ -15,8 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::ops::Range;
-
 use arrow_array::builder::BooleanBufferBuilder;
 use arrow_buffer::bit_chunk_iterator::UnalignedBitChunk;
 use arrow_buffer::Buffer;
@@ -25,7 +23,7 @@ use bytes::Bytes;
 use crate::arrow::buffer::bit_util::count_set_bits;
 use crate::basic::Encoding;
 use crate::column::reader::decoder::{
-    ColumnLevelDecoder, DefinitionLevelDecoder, DefinitionLevelDecoderImpl, 
LevelsBufferSlice,
+    ColumnLevelDecoder, DefinitionLevelDecoder, DefinitionLevelDecoderImpl,
 };
 use crate::errors::{ParquetError, Result};
 use crate::schema::types::ColumnDescPtr;
@@ -85,18 +83,13 @@ impl DefinitionLevelBuffer {
     }
 
     /// Returns the built level data
-    pub fn consume_levels(&mut self) -> Option<Buffer> {
+    pub fn consume_levels(&mut self) -> Option<Vec<i16>> {
         match &mut self.inner {
-            BufferInner::Full { levels, .. } => 
Some(Buffer::from_vec(std::mem::take(levels))),
+            BufferInner::Full { levels, .. } => Some(std::mem::take(levels)),
             BufferInner::Mask { .. } => None,
         }
     }
 
-    pub fn set_len(&mut self, len: usize) {
-        assert_eq!(self.nulls().len(), len);
-        self.len = len;
-    }
-
     /// Returns the built null bitmask
     pub fn consume_bitmask(&mut self) -> Buffer {
         self.len = 0;
@@ -114,18 +107,6 @@ impl DefinitionLevelBuffer {
     }
 }
 
-impl LevelsBufferSlice for DefinitionLevelBuffer {
-    fn capacity(&self) -> usize {
-        usize::MAX
-    }
-
-    fn count_nulls(&self, range: Range<usize>, _max_level: i16) -> usize {
-        let total_count = range.end - range.start;
-        let range = range.start + self.len..range.end + self.len;
-        total_count - count_set_bits(self.nulls().as_slice(), range)
-    }
-}
-
 enum MaybePacked {
     Packed(PackedDecoder),
     Fallback(DefinitionLevelDecoderImpl),
@@ -148,7 +129,7 @@ impl DefinitionLevelBufferDecoder {
 }
 
 impl ColumnLevelDecoder for DefinitionLevelBufferDecoder {
-    type Slice = DefinitionLevelBuffer;
+    type Buffer = DefinitionLevelBuffer;
 
     fn set_data(&mut self, encoding: Encoding, data: Bytes) {
         match &mut self.decoder {
@@ -159,7 +140,11 @@ impl ColumnLevelDecoder for DefinitionLevelBufferDecoder {
 }
 
 impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder {
-    fn read_def_levels(&mut self, writer: &mut Self::Slice, range: 
Range<usize>) -> Result<usize> {
+    fn read_def_levels(
+        &mut self,
+        writer: &mut Self::Buffer,
+        num_levels: usize,
+    ) -> Result<(usize, usize)> {
         match (&mut writer.inner, &mut self.decoder) {
             (
                 BufferInner::Full {
@@ -170,33 +155,33 @@ impl DefinitionLevelDecoder for 
DefinitionLevelBufferDecoder {
                 MaybePacked::Fallback(decoder),
             ) => {
                 assert_eq!(self.max_level, *max_level);
-                assert_eq!(range.start + writer.len, nulls.len());
 
-                levels.resize(range.end + writer.len, 0);
-
-                let slice = &mut levels[writer.len..];
-                let levels_read = decoder.read_def_levels(slice, 
range.clone())?;
+                let start = levels.len();
+                let (values_read, levels_read) = 
decoder.read_def_levels(levels, num_levels)?;
 
                 nulls.reserve(levels_read);
-                for i in &slice[range.start..range.start + levels_read] {
-                    nulls.append(i == max_level)
+                for i in &levels[start..] {
+                    nulls.append(i == max_level);
                 }
 
-                Ok(levels_read)
+                Ok((values_read, levels_read))
             }
             (BufferInner::Mask { nulls }, MaybePacked::Packed(decoder)) => {
                 assert_eq!(self.max_level, 1);
-                assert_eq!(range.start + writer.len, nulls.len());
 
-                decoder.read(nulls, range.end - range.start)
+                let start = nulls.len();
+                let levels_read = decoder.read(nulls, num_levels)?;
+
+                let values_read = count_set_bits(nulls.as_slice(), 
start..start + levels_read);
+                Ok((values_read, levels_read))
             }
             _ => unreachable!("inconsistent null mask"),
         }
     }
 
-    fn skip_def_levels(&mut self, num_levels: usize, max_def_level: i16) -> 
Result<(usize, usize)> {
+    fn skip_def_levels(&mut self, num_levels: usize) -> Result<(usize, usize)> 
{
         match &mut self.decoder {
-            MaybePacked::Fallback(decoder) => 
decoder.skip_def_levels(num_levels, max_def_level),
+            MaybePacked::Fallback(decoder) => 
decoder.skip_def_levels(num_levels),
             MaybePacked::Packed(decoder) => decoder.skip(num_levels),
         }
     }
diff --git a/parquet/src/arrow/record_reader/mod.rs 
b/parquet/src/arrow/record_reader/mod.rs
index 49c69c87e3..7456da053b 100644
--- a/parquet/src/arrow/record_reader/mod.rs
+++ b/parquet/src/arrow/record_reader/mod.rs
@@ -18,7 +18,7 @@
 use arrow_buffer::Buffer;
 
 use crate::arrow::record_reader::{
-    buffer::{BufferQueue, ValuesBuffer},
+    buffer::ValuesBuffer,
     definition_levels::{DefinitionLevelBuffer, DefinitionLevelBufferDecoder},
 };
 use crate::column::reader::decoder::RepetitionLevelDecoderImpl;
@@ -62,28 +62,18 @@ pub struct GenericRecordReader<V, CV> {
 
 impl<V, CV> GenericRecordReader<V, CV>
 where
-    V: ValuesBuffer + Default,
-    CV: ColumnValueDecoder<Slice = V::Slice>,
+    V: ValuesBuffer,
+    CV: ColumnValueDecoder<Buffer = V>,
 {
     /// Create a new [`GenericRecordReader`]
     pub fn new(desc: ColumnDescPtr) -> Self {
-        Self::new_with_records(desc, V::default())
-    }
-}
-
-impl<V, CV> GenericRecordReader<V, CV>
-where
-    V: ValuesBuffer,
-    CV: ColumnValueDecoder<Slice = V::Slice>,
-{
-    pub fn new_with_records(desc: ColumnDescPtr, records: V) -> Self {
         let def_levels = (desc.max_def_level() > 0)
             .then(|| DefinitionLevelBuffer::new(&desc, 
packed_null_mask(&desc)));
 
         let rep_levels = (desc.max_rep_level() > 0).then(Vec::new);
 
         Self {
-            values: records,
+            values: V::default(),
             def_levels,
             rep_levels,
             column_reader: None,
@@ -166,22 +156,20 @@ where
     /// The implementation has side effects. It will create a new buffer to 
hold those
     /// definition level values that have already been read into memory but 
not counted
     /// as record values, e.g. those from `self.num_values` to 
`self.values_written`.
-    pub fn consume_def_levels(&mut self) -> Option<Buffer> {
+    pub fn consume_def_levels(&mut self) -> Option<Vec<i16>> {
         self.def_levels.as_mut().and_then(|x| x.consume_levels())
     }
 
     /// Return repetition level data.
     /// The side effect is similar to `consume_def_levels`.
-    pub fn consume_rep_levels(&mut self) -> Option<Buffer> {
-        self.rep_levels
-            .as_mut()
-            .map(|x| Buffer::from_vec(x.consume()))
+    pub fn consume_rep_levels(&mut self) -> Option<Vec<i16>> {
+        self.rep_levels.as_mut().map(std::mem::take)
     }
 
     /// Returns currently stored buffer data.
     /// The side effect is similar to `consume_def_levels`.
-    pub fn consume_record_data(&mut self) -> V::Output {
-        self.values.consume()
+    pub fn consume_record_data(&mut self) -> V {
+        std::mem::take(&mut self.values)
     }
 
     /// Returns currently stored null bitmap data.
@@ -207,18 +195,13 @@ where
 
     /// Try to read one batch of data returning the number of records read
     fn read_one_batch(&mut self, batch_size: usize) -> Result<usize> {
-        let rep_levels = self
-            .rep_levels
-            .as_mut()
-            .map(|levels| levels.get_output_slice(batch_size));
-        let def_levels = self.def_levels.as_mut();
-        let values = self.values.get_output_slice(batch_size);
-
-        let (records_read, values_read, levels_read) = self
-            .column_reader
-            .as_mut()
-            .unwrap()
-            .read_records(batch_size, def_levels, rep_levels, values)?;
+        let (records_read, values_read, levels_read) =
+            self.column_reader.as_mut().unwrap().read_records(
+                batch_size,
+                self.def_levels.as_mut(),
+                self.rep_levels.as_mut(),
+                &mut self.values,
+            )?;
 
         if values_read < levels_read {
             let def_levels = self.def_levels.as_ref().ok_or_else(|| {
@@ -235,13 +218,6 @@ where
 
         self.num_records += records_read;
         self.num_values += levels_read;
-        self.values.truncate_buffer(self.num_values);
-        if let Some(ref mut buf) = self.rep_levels {
-            buf.truncate_buffer(self.num_values)
-        };
-        if let Some(ref mut buf) = self.def_levels {
-            buf.set_len(self.num_values)
-        };
         Ok(records_read)
     }
 }
@@ -258,7 +234,6 @@ mod tests {
     use std::sync::Arc;
 
     use arrow::buffer::Buffer;
-    use arrow_array::builder::Int16BufferBuilder;
 
     use crate::basic::Encoding;
     use crate::data_type::Int32Type;
@@ -417,11 +392,8 @@ mod tests {
         }
 
         // Verify result def levels
-        let mut bb = Int16BufferBuilder::new(7);
-        bb.append_slice(&[1i16, 2i16, 0i16, 2i16, 2i16, 0i16, 2i16]);
-        let expected_def_levels = bb.finish();
         assert_eq!(
-            Some(expected_def_levels),
+            Some(vec![1i16, 2i16, 0i16, 2i16, 2i16, 0i16, 2i16]),
             record_reader.consume_def_levels()
         );
 
@@ -526,11 +498,8 @@ mod tests {
         }
 
         // Verify result def levels
-        let mut bb = Int16BufferBuilder::new(9);
-        bb.append_slice(&[2i16, 0i16, 1i16, 2i16, 2i16, 2i16, 2i16, 2i16, 
2i16]);
-        let expected_def_levels = bb.finish();
         assert_eq!(
-            Some(expected_def_levels),
+            Some(vec![2i16, 0i16, 1i16, 2i16, 2i16, 2i16, 2i16, 2i16, 2i16]),
             record_reader.consume_def_levels()
         );
 
@@ -792,11 +761,8 @@ mod tests {
         }
 
         // Verify result def levels
-        let mut bb = Int16BufferBuilder::new(7);
-        bb.append_slice(&[0i16, 2i16, 2i16]);
-        let expected_def_levels = bb.finish();
         assert_eq!(
-            Some(expected_def_levels),
+            Some(vec![0i16, 2i16, 2i16]),
             record_reader.consume_def_levels()
         );
 
diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs
index 6c712ead62..4f861917c9 100644
--- a/parquet/src/column/reader.rs
+++ b/parquet/src/column/reader.rs
@@ -23,7 +23,7 @@ use super::page::{Page, PageReader};
 use crate::basic::*;
 use crate::column::reader::decoder::{
     ColumnValueDecoder, ColumnValueDecoderImpl, DefinitionLevelDecoder, 
DefinitionLevelDecoderImpl,
-    LevelsBufferSlice, RepetitionLevelDecoder, RepetitionLevelDecoderImpl, 
ValuesBufferSlice,
+    RepetitionLevelDecoder, RepetitionLevelDecoderImpl,
 };
 use crate::data_type::*;
 use crate::errors::{ParquetError, Result};
@@ -193,9 +193,9 @@ where
     pub fn read_batch(
         &mut self,
         batch_size: usize,
-        def_levels: Option<&mut D::Slice>,
-        rep_levels: Option<&mut R::Slice>,
-        values: &mut V::Slice,
+        def_levels: Option<&mut D::Buffer>,
+        rep_levels: Option<&mut R::Buffer>,
+        values: &mut V::Buffer,
     ) -> Result<(usize, usize)> {
         let (_, values, levels) = self.read_records(batch_size, def_levels, 
rep_levels, values)?;
 
@@ -219,41 +219,26 @@ where
     pub fn read_records(
         &mut self,
         max_records: usize,
-        mut def_levels: Option<&mut D::Slice>,
-        mut rep_levels: Option<&mut R::Slice>,
-        values: &mut V::Slice,
+        mut def_levels: Option<&mut D::Buffer>,
+        mut rep_levels: Option<&mut R::Buffer>,
+        values: &mut V::Buffer,
     ) -> Result<(usize, usize, usize)> {
-        let mut max_levels = values.capacity().min(max_records);
-        if let Some(ref levels) = def_levels {
-            max_levels = max_levels.min(levels.capacity());
-        }
-        if let Some(ref levels) = rep_levels {
-            max_levels = max_levels.min(levels.capacity())
-        }
-
         let mut total_records_read = 0;
         let mut total_levels_read = 0;
         let mut total_values_read = 0;
 
-        while total_records_read < max_records
-            && total_levels_read < max_levels
-            && self.has_next()?
-        {
+        while total_records_read < max_records && self.has_next()? {
             let remaining_records = max_records - total_records_read;
             let remaining_levels = self.num_buffered_values - 
self.num_decoded_values;
-            let levels_to_read = remaining_levels.min(max_levels - 
total_levels_read);
 
-            let (records_read, levels_read) = match 
self.rep_level_decoder.as_mut() {
+            let (records_read, levels_to_read) = match 
self.rep_level_decoder.as_mut() {
                 Some(reader) => {
                     let out = rep_levels
                         .as_mut()
                         .ok_or_else(|| general_err!("must specify repetition 
levels"))?;
 
-                    let (mut records_read, levels_read) = 
reader.read_rep_levels(
-                        out,
-                        total_levels_read..total_levels_read + levels_to_read,
-                        remaining_records,
-                    )?;
+                    let (mut records_read, levels_read) =
+                        reader.read_rep_levels(out, remaining_records, 
remaining_levels)?;
 
                     if levels_read == remaining_levels && 
self.has_record_delimiter {
                         // Reached end of page, which implies records_read < 
remaining_records
@@ -264,7 +249,7 @@ where
                     (records_read, levels_read)
                 }
                 None => {
-                    let min = remaining_records.min(levels_to_read);
+                    let min = remaining_records.min(remaining_levels);
                     (min, min)
                 }
             };
@@ -275,26 +260,18 @@ where
                         .as_mut()
                         .ok_or_else(|| general_err!("must specify definition 
levels"))?;
 
-                    let read = reader
-                        .read_def_levels(out, 
total_levels_read..total_levels_read + levels_read)?;
+                    let (values_read, levels_read) = 
reader.read_def_levels(out, levels_to_read)?;
 
-                    if read != levels_read {
+                    if levels_read != levels_to_read {
                         return Err(general_err!("insufficient definition 
levels read from column - expected {rep_levels}, got {read}"));
                     }
 
-                    let null_count = out.count_nulls(
-                        total_levels_read..total_levels_read + read,
-                        self.descr.max_def_level(),
-                    );
-                    levels_read - null_count
+                    values_read
                 }
-                None => levels_read,
+                None => levels_to_read,
             };
 
-            let values_read = self.values_decoder.read(
-                values,
-                total_values_read..total_values_read + values_to_read,
-            )?;
+            let values_read = self.values_decoder.read(values, 
values_to_read)?;
 
             if values_read != values_to_read {
                 return Err(general_err!(
@@ -302,9 +279,9 @@ where
                 ));
             }
 
-            self.num_decoded_values += levels_read;
+            self.num_decoded_values += levels_to_read;
             total_records_read += records_read;
-            total_levels_read += levels_read;
+            total_levels_read += levels_to_read;
             total_values_read += values_read;
         }
 
@@ -389,9 +366,7 @@ where
             }
 
             let (values_read, def_levels_read) = match 
self.def_level_decoder.as_mut() {
-                Some(decoder) => {
-                    decoder.skip_def_levels(rep_levels_read, 
self.descr.max_def_level())?
-                }
+                Some(decoder) => decoder.skip_def_levels(rep_levels_read)?,
                 None => (rep_levels_read, rep_levels_read),
             };
 
@@ -1016,34 +991,22 @@ mod tests {
 
     #[test]
     fn test_read_batch_values_only() {
-        test_read_batch_int32(16, &mut [0; 10], None, None); // < batch_size
-        test_read_batch_int32(16, &mut [0; 16], None, None); // == batch_size
-        test_read_batch_int32(16, &mut [0; 51], None, None); // > batch_size
+        test_read_batch_int32(16, 0, 0);
     }
 
     #[test]
     fn test_read_batch_values_def_levels() {
-        test_read_batch_int32(16, &mut [0; 10], Some(&mut [0; 10]), None);
-        test_read_batch_int32(16, &mut [0; 16], Some(&mut [0; 16]), None);
-        test_read_batch_int32(16, &mut [0; 51], Some(&mut [0; 51]), None);
+        test_read_batch_int32(16, MAX_DEF_LEVEL, 0);
     }
 
     #[test]
     fn test_read_batch_values_rep_levels() {
-        test_read_batch_int32(16, &mut [0; 10], None, Some(&mut [0; 10]));
-        test_read_batch_int32(16, &mut [0; 16], None, Some(&mut [0; 16]));
-        test_read_batch_int32(16, &mut [0; 51], None, Some(&mut [0; 51]));
-    }
-
-    #[test]
-    fn test_read_batch_different_buf_sizes() {
-        test_read_batch_int32(16, &mut [0; 8], Some(&mut [0; 9]), Some(&mut 
[0; 7]));
-        test_read_batch_int32(16, &mut [0; 1], Some(&mut [0; 9]), Some(&mut 
[0; 3]));
+        test_read_batch_int32(16, 0, MAX_REP_LEVEL);
     }
 
     #[test]
     fn test_read_batch_values_def_rep_levels() {
-        test_read_batch_int32(128, &mut [0; 128], Some(&mut [0; 128]), 
Some(&mut [0; 128]));
+        test_read_batch_int32(128, MAX_DEF_LEVEL, MAX_REP_LEVEL);
     }
 
     #[test]
@@ -1065,9 +1028,6 @@ mod tests {
         let num_pages = 2;
         let num_levels = 4;
         let batch_size = 5;
-        let values = &mut vec![0; 7];
-        let def_levels = &mut vec![0; 7];
-        let rep_levels = &mut vec![0; 7];
 
         let mut tester = ColumnReaderTester::<Int32Type>::new();
         tester.test_read_batch(
@@ -1078,9 +1038,6 @@ mod tests {
             batch_size,
             std::i32::MIN,
             std::i32::MAX,
-            values,
-            Some(def_levels),
-            Some(rep_levels),
             false,
         );
     }
@@ -1153,24 +1110,8 @@ mod tests {
     //
     // This is a high level wrapper on `ColumnReaderTester` that allows us to 
specify some
     // boilerplate code for setting up definition/repetition levels and column 
descriptor.
-    fn test_read_batch_int32(
-        batch_size: usize,
-        values: &mut [i32],
-        def_levels: Option<&mut [i16]>,
-        rep_levels: Option<&mut [i16]>,
-    ) {
+    fn test_read_batch_int32(batch_size: usize, max_def_level: i16, 
max_rep_level: i16) {
         let primitive_type = get_test_int32_type();
-        // make field is required based on provided slices of levels
-        let max_def_level = if def_levels.is_some() {
-            MAX_DEF_LEVEL
-        } else {
-            0
-        };
-        let max_rep_level = if rep_levels.is_some() {
-            MAX_REP_LEVEL
-        } else {
-            0
-        };
 
         let desc = Arc::new(ColumnDescriptor::new(
             Arc::new(primitive_type),
@@ -1178,6 +1119,7 @@ mod tests {
             max_rep_level,
             ColumnPath::new(Vec::new()),
         ));
+
         let mut tester = ColumnReaderTester::<Int32Type>::new();
         tester.test_read_batch(
             desc,
@@ -1187,9 +1129,6 @@ mod tests {
             batch_size,
             i32::MIN,
             i32::MAX,
-            values,
-            def_levels,
-            rep_levels,
             false,
         );
     }
@@ -1317,21 +1256,8 @@ mod tests {
             max: T::T,
             use_v2: bool,
         ) {
-            let mut def_levels = vec![0; num_levels * num_pages];
-            let mut rep_levels = vec![0; num_levels * num_pages];
-            let mut values = vec![T::T::default(); num_levels * num_pages];
             self.test_read_batch(
-                desc,
-                encoding,
-                num_pages,
-                num_levels,
-                batch_size,
-                min,
-                max,
-                &mut values,
-                Some(&mut def_levels),
-                Some(&mut rep_levels),
-                use_v2,
+                desc, encoding, num_pages, num_levels, batch_size, min, max, 
use_v2,
             );
         }
 
@@ -1347,9 +1273,6 @@ mod tests {
             batch_size: usize,
             min: T::T,
             max: T::T,
-            values: &mut [T::T],
-            mut def_levels: Option<&mut [i16]>,
-            mut rep_levels: Option<&mut [i16]>,
             use_v2: bool,
         ) {
             let mut pages = VecDeque::new();
@@ -1372,18 +1295,19 @@ mod tests {
             let column_reader: ColumnReader = get_column_reader(desc, 
Box::new(page_reader));
             let mut typed_column_reader = 
get_typed_column_reader::<T>(column_reader);
 
+            let mut values = Vec::new();
+            let mut def_levels = Vec::new();
+            let mut rep_levels = Vec::new();
+
             let mut curr_values_read = 0;
             let mut curr_levels_read = 0;
             loop {
-                let actual_def_levels = def_levels.as_mut().map(|vec| &mut 
vec[curr_levels_read..]);
-                let actual_rep_levels = rep_levels.as_mut().map(|vec| &mut 
vec[curr_levels_read..]);
-
                 let (_, values_read, levels_read) = typed_column_reader
                     .read_records(
                         batch_size,
-                        actual_def_levels,
-                        actual_rep_levels,
-                        &mut values[curr_values_read..],
+                        Some(&mut def_levels),
+                        Some(&mut rep_levels),
+                        &mut values,
                     )
                     .expect("read_batch() should be OK");
 
@@ -1395,38 +1319,18 @@ mod tests {
                 }
             }
 
-            assert!(
-                values.len() >= curr_values_read,
-                "values.len() >= values_read"
-            );
-            assert_eq!(
-                &values[0..curr_values_read],
-                &self.values[0..curr_values_read],
-                "values content doesn't match"
-            );
+            assert_eq!(values, self.values, "values content doesn't match");
 
             if max_def_level > 0 {
-                let levels = def_levels.as_ref().unwrap();
-                assert!(
-                    levels.len() >= curr_levels_read,
-                    "def_levels.len() >= levels_read"
-                );
                 assert_eq!(
-                    &levels[0..curr_levels_read],
-                    &self.def_levels[0..curr_levels_read],
+                    def_levels, self.def_levels,
                     "definition levels content doesn't match"
                 );
             }
 
             if max_rep_level > 0 {
-                let levels = rep_levels.as_ref().unwrap();
-                assert!(
-                    levels.len() >= curr_levels_read,
-                    "rep_levels.len() >= levels_read"
-                );
                 assert_eq!(
-                    &levels[0..curr_levels_read],
-                    &self.rep_levels[0..curr_levels_read],
+                    rep_levels, self.rep_levels,
                     "repetition levels content doesn't match"
                 );
             }
diff --git a/parquet/src/column/reader/decoder.rs 
b/parquet/src/column/reader/decoder.rs
index ef62724689..9889973b67 100644
--- a/parquet/src/column/reader/decoder.rs
+++ b/parquet/src/column/reader/decoder.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use std::collections::HashMap;
-use std::ops::Range;
 
 use bytes::Bytes;
 
@@ -30,52 +29,18 @@ use crate::errors::{ParquetError, Result};
 use crate::schema::types::ColumnDescPtr;
 use crate::util::bit_util::{num_required_bits, BitReader};
 
-/// A slice of levels buffer data that is written to by a 
[`ColumnLevelDecoder`]
-pub trait LevelsBufferSlice {
-    /// Returns the capacity of this slice or `usize::MAX` if no limit
-    fn capacity(&self) -> usize;
-
-    /// Count the number of levels in `range` not equal to `max_level`
-    fn count_nulls(&self, range: Range<usize>, max_level: i16) -> usize;
-}
-
-impl LevelsBufferSlice for [i16] {
-    fn capacity(&self) -> usize {
-        self.len()
-    }
-
-    fn count_nulls(&self, range: Range<usize>, max_level: i16) -> usize {
-        self[range].iter().filter(|i| **i != max_level).count()
-    }
-}
-
-/// A slice of values buffer data that is written to by a 
[`ColumnValueDecoder`]
-pub trait ValuesBufferSlice {
-    /// Returns the capacity of this slice or `usize::MAX` if no limit
-    fn capacity(&self) -> usize;
-}
-
-impl<T> ValuesBufferSlice for [T] {
-    fn capacity(&self) -> usize {
-        self.len()
-    }
-}
-
-/// Decodes level data to a [`LevelsBufferSlice`]
+/// Decodes level data
 pub trait ColumnLevelDecoder {
-    type Slice: LevelsBufferSlice + ?Sized;
+    type Buffer;
 
     /// Set data for this [`ColumnLevelDecoder`]
     fn set_data(&mut self, encoding: Encoding, data: Bytes);
 }
 
 pub trait RepetitionLevelDecoder: ColumnLevelDecoder {
-    /// Read up to `max_records` of repetition level data into `out[range]` 
returning the number
+    /// Read up to `max_records` of repetition level data into `out` returning 
the number
     /// of complete records and levels read
     ///
-    /// `range` is provided by the caller to allow for types such as 
default-initialized `[T]`
-    /// that only track capacity and not length
-    ///
     /// A record only ends when the data contains a subsequent repetition 
level of 0,
     /// it is therefore left to the caller to delimit the final record in a 
column
     ///
@@ -84,9 +49,9 @@ pub trait RepetitionLevelDecoder: ColumnLevelDecoder {
     /// Implementations may panic if `range` overlaps with already written data
     fn read_rep_levels(
         &mut self,
-        out: &mut Self::Slice,
-        range: Range<usize>,
-        max_records: usize,
+        out: &mut Self::Buffer,
+        num_records: usize,
+        num_levels: usize,
     ) -> Result<(usize, usize)>;
 
     /// Skips over up to `num_levels` repetition levels corresponding to 
`num_records` records,
@@ -103,27 +68,28 @@ pub trait RepetitionLevelDecoder: ColumnLevelDecoder {
 }
 
 pub trait DefinitionLevelDecoder: ColumnLevelDecoder {
-    /// Read definition level data into `out[range]` returning the number of 
levels read
+    /// Read up to `num_levels` definition levels into `out`
     ///
-    /// `range` is provided by the caller to allow for types such as 
default-initialized `[T]`
-    /// that only track capacity and not length
+    /// Returns the number of values skipped, and the number of levels skipped
     ///
     /// # Panics
     ///
     /// Implementations may panic if `range` overlaps with already written data
-    ///
-    // TODO: Should this return the number of nulls
-    fn read_def_levels(&mut self, out: &mut Self::Slice, range: Range<usize>) 
-> Result<usize>;
+    fn read_def_levels(
+        &mut self,
+        out: &mut Self::Buffer,
+        num_levels: usize,
+    ) -> Result<(usize, usize)>;
 
     /// Skips over `num_levels` definition levels
     ///
     /// Returns the number of values skipped, and the number of levels skipped
-    fn skip_def_levels(&mut self, num_levels: usize, max_def_level: i16) -> 
Result<(usize, usize)>;
+    fn skip_def_levels(&mut self, num_levels: usize) -> Result<(usize, usize)>;
 }
 
-/// Decodes value data to a [`ValuesBufferSlice`]
+/// Decodes value data
 pub trait ColumnValueDecoder {
-    type Slice: ValuesBufferSlice + ?Sized;
+    type Buffer;
 
     /// Create a new [`ColumnValueDecoder`]
     fn new(col: &ColumnDescPtr) -> Self;
@@ -156,16 +122,13 @@ pub trait ColumnValueDecoder {
         num_values: Option<usize>,
     ) -> Result<()>;
 
-    /// Read values data into `out[range]` returning the number of values read
-    ///
-    /// `range` is provided by the caller to allow for types such as 
default-initialized `[T]`
-    /// that only track capacity and not length
+    /// Read up to `num_values` values into `out`
     ///
     /// # Panics
     ///
     /// Implementations may panic if `range` overlaps with already written data
     ///
-    fn read(&mut self, out: &mut Self::Slice, range: Range<usize>) -> 
Result<usize>;
+    fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> 
Result<usize>;
 
     /// Skips over `num_values` values
     ///
@@ -184,7 +147,7 @@ pub struct ColumnValueDecoderImpl<T: DataType> {
 }
 
 impl<T: DataType> ColumnValueDecoder for ColumnValueDecoderImpl<T> {
-    type Slice = [T::T];
+    type Buffer = Vec<T::T>;
 
     fn new(descr: &ColumnDescPtr) -> Self {
         Self {
@@ -258,7 +221,7 @@ impl<T: DataType> ColumnValueDecoder for 
ColumnValueDecoderImpl<T> {
         Ok(())
     }
 
-    fn read(&mut self, out: &mut Self::Slice, range: Range<usize>) -> 
Result<usize> {
+    fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> 
Result<usize> {
         let encoding = self
             .current_encoding
             .expect("current_encoding should be set");
@@ -268,7 +231,12 @@ impl<T: DataType> ColumnValueDecoder for 
ColumnValueDecoderImpl<T> {
             .get_mut(&encoding)
             .unwrap_or_else(|| panic!("decoder for encoding {encoding} should 
be set"));
 
-        current_decoder.get(&mut out[range])
+        // TODO: Push vec into decoder (#5177)
+        let start = out.len();
+        out.resize(start + num_values, T::T::default());
+        let read = current_decoder.get(&mut out[start..])?;
+        out.truncate(start + read);
+        Ok(read)
     }
 
     fn skip_values(&mut self, num_values: usize) -> Result<usize> {
@@ -319,6 +287,7 @@ impl LevelDecoder {
 pub struct DefinitionLevelDecoderImpl {
     decoder: Option<LevelDecoder>,
     bit_width: u8,
+    max_level: i16,
 }
 
 impl DefinitionLevelDecoderImpl {
@@ -327,12 +296,13 @@ impl DefinitionLevelDecoderImpl {
         Self {
             decoder: None,
             bit_width,
+            max_level,
         }
     }
 }
 
 impl ColumnLevelDecoder for DefinitionLevelDecoderImpl {
-    type Slice = [i16];
+    type Buffer = Vec<i16>;
 
     fn set_data(&mut self, encoding: Encoding, data: Bytes) {
         self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width))
@@ -340,11 +310,23 @@ impl ColumnLevelDecoder for DefinitionLevelDecoderImpl {
 }
 
 impl DefinitionLevelDecoder for DefinitionLevelDecoderImpl {
-    fn read_def_levels(&mut self, out: &mut Self::Slice, range: Range<usize>) 
-> Result<usize> {
-        self.decoder.as_mut().unwrap().read(&mut out[range])
+    fn read_def_levels(
+        &mut self,
+        out: &mut Self::Buffer,
+        num_levels: usize,
+    ) -> Result<(usize, usize)> {
+        // TODO: Push vec into decoder (#5177)
+        let start = out.len();
+        out.resize(start + num_levels, 0);
+        let levels_read = self.decoder.as_mut().unwrap().read(&mut 
out[start..])?;
+        out.truncate(start + levels_read);
+
+        let iter = out.iter().skip(start);
+        let values_read = iter.filter(|x| **x == self.max_level).count();
+        Ok((values_read, levels_read))
     }
 
-    fn skip_def_levels(&mut self, num_levels: usize, max_def_level: i16) -> 
Result<(usize, usize)> {
+    fn skip_def_levels(&mut self, num_levels: usize) -> Result<(usize, usize)> 
{
         let mut level_skip = 0;
         let mut value_skip = 0;
         let mut buf: Vec<i16> = vec![];
@@ -353,14 +335,14 @@ impl DefinitionLevelDecoder for 
DefinitionLevelDecoderImpl {
 
             let to_read = remaining_levels.min(SKIP_BUFFER_SIZE);
             buf.resize(to_read, 0);
-            let read = self.read_def_levels(&mut buf, 0..to_read)?;
-            if read == 0 {
+            let (values_read, levels_read) = self.read_def_levels(&mut buf, 
to_read)?;
+            if levels_read == 0 {
                 // Reached end of page
                 break;
             }
 
-            level_skip += read;
-            value_skip += buf[..read].iter().filter(|x| **x == 
max_def_level).count();
+            level_skip += levels_read;
+            value_skip += values_read;
         }
 
         Ok((value_skip, level_skip))
@@ -423,7 +405,7 @@ impl RepetitionLevelDecoderImpl {
 }
 
 impl ColumnLevelDecoder for RepetitionLevelDecoderImpl {
-    type Slice = [i16];
+    type Buffer = Vec<i16>;
 
     fn set_data(&mut self, encoding: Encoding, data: Bytes) {
         self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width));
@@ -435,16 +417,14 @@ impl ColumnLevelDecoder for RepetitionLevelDecoderImpl {
 impl RepetitionLevelDecoder for RepetitionLevelDecoderImpl {
     fn read_rep_levels(
         &mut self,
-        out: &mut Self::Slice,
-        range: Range<usize>,
-        max_records: usize,
+        out: &mut Self::Buffer,
+        num_records: usize,
+        num_levels: usize,
     ) -> Result<(usize, usize)> {
-        let output = &mut out[range];
-        let max_levels = output.len();
         let mut total_records_read = 0;
         let mut total_levels_read = 0;
 
-        while total_records_read < max_records && total_levels_read < 
max_levels {
+        while total_records_read < num_records && total_levels_read < 
num_levels {
             if self.buffer_len == self.buffer_offset {
                 self.fill_buf()?;
                 if self.buffer_len == 0 {
@@ -453,11 +433,11 @@ impl RepetitionLevelDecoder for 
RepetitionLevelDecoderImpl {
             }
 
             let (partial, records_read, levels_read) = self.count_records(
-                max_records - total_records_read,
-                max_levels - total_levels_read,
+                num_records - total_records_read,
+                num_levels - total_levels_read,
             );
 
-            output[total_levels_read..total_levels_read + 
levels_read].copy_from_slice(
+            out.extend_from_slice(
                 &self.buffer[self.buffer_offset..self.buffer_offset + 
levels_read],
             );
 
@@ -550,13 +530,13 @@ mod tests {
                 let (records_read, levels_read) = if skip {
                     decoder.skip_rep_levels(records, remaining_levels).unwrap()
                 } else {
-                    let mut decoded = vec![0; remaining_levels];
+                    let mut decoded = Vec::new();
                     let (records_read, levels_read) = decoder
-                        .read_rep_levels(&mut decoded, 0..remaining_levels, 
records)
+                        .read_rep_levels(&mut decoded, records, 
remaining_levels)
                         .unwrap();
 
                     assert_eq!(
-                        decoded[..levels_read],
+                        decoded,
                         encoded[encoded.len() - 
remaining_levels..][..levels_read]
                     );
                     (records_read, levels_read)
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 9f476595fb..597386b336 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -3292,19 +3292,20 @@ mod tests {
             )
             .unwrap(),
         );
-        let reader = get_test_column_reader::<T>(page_reader, max_def_level, 
max_rep_level);
-
-        let mut actual_values = vec![T::T::default(); max_batch_size];
-        let mut actual_def_levels = def_levels.map(|_| vec![0i16; 
max_batch_size]);
-        let mut actual_rep_levels = rep_levels.map(|_| vec![0i16; 
max_batch_size]);
-
-        let (_, values_read, levels_read) = read_fully(
-            reader,
-            max_batch_size,
-            actual_def_levels.as_mut(),
-            actual_rep_levels.as_mut(),
-            actual_values.as_mut_slice(),
-        );
+        let mut reader = get_test_column_reader::<T>(page_reader, 
max_def_level, max_rep_level);
+
+        let mut actual_values = Vec::with_capacity(max_batch_size);
+        let mut actual_def_levels = def_levels.map(|_| 
Vec::with_capacity(max_batch_size));
+        let mut actual_rep_levels = rep_levels.map(|_| 
Vec::with_capacity(max_batch_size));
+
+        let (_, values_read, levels_read) = reader
+            .read_records(
+                max_batch_size,
+                actual_def_levels.as_mut(),
+                actual_rep_levels.as_mut(),
+                &mut actual_values,
+            )
+            .unwrap();
 
         // Assert values, definition and repetition levels.
 
@@ -3367,22 +3368,6 @@ mod tests {
         assert_eq!(meta.encodings(), &encodings);
     }
 
-    /// Reads one batch of data, considering that batch is large enough to 
capture all of
-    /// the values and levels.
-    fn read_fully<T: DataType>(
-        mut reader: ColumnReaderImpl<T>,
-        batch_size: usize,
-        mut def_levels: Option<&mut Vec<i16>>,
-        mut rep_levels: Option<&mut Vec<i16>>,
-        values: &mut [T::T],
-    ) -> (usize, usize, usize) {
-        let actual_def_levels = def_levels.as_mut().map(|vec| &mut vec[..]);
-        let actual_rep_levels = rep_levels.as_mut().map(|vec| &mut vec[..]);
-        reader
-            .read_records(batch_size, actual_def_levels, actual_rep_levels, 
values)
-            .unwrap()
-    }
-
     /// Returns column writer.
     fn get_test_column_writer<'a, T: DataType>(
         page_writer: Box<dyn PageWriter + 'a>,
diff --git a/parquet/src/file/serialized_reader.rs 
b/parquet/src/file/serialized_reader.rs
index fbb172d3b3..468433f31d 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -1739,8 +1739,8 @@ mod tests {
         let row_group_reader = reader.get_row_group(0).unwrap();
         match row_group_reader.get_column_reader(0).unwrap() {
             ColumnReader::Int64ColumnReader(mut reader) => {
-                let mut buffer = [0; 1024];
-                let mut def_levels = [0; 1024];
+                let mut buffer = Vec::with_capacity(1024);
+                let mut def_levels = Vec::with_capacity(1024);
                 let (num_records, num_values, num_levels) = reader
                     .read_records(1024, Some(&mut def_levels), None, &mut 
buffer)
                     .unwrap();
@@ -1750,7 +1750,7 @@ mod tests {
                 assert_eq!(num_levels, 513);
 
                 let expected: Vec<i64> = (1..514).collect();
-                assert_eq!(&buffer[..513], &expected);
+                assert_eq!(&buffer, &expected);
             }
             _ => unreachable!(),
         }
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index f0b75f3025..3534bab816 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -1700,12 +1700,14 @@ mod tests {
         let test_read = |reader: SerializedFileReader<Bytes>| {
             let row_group = reader.get_row_group(0).unwrap();
 
-            let mut out = [0; 4];
+            let mut out = Vec::with_capacity(4);
             let c1 = row_group.get_column_reader(0).unwrap();
             let mut c1 = get_typed_column_reader::<Int32Type>(c1);
             c1.read_records(4, None, None, &mut out).unwrap();
             assert_eq!(out, column_data[0]);
 
+            out.clear();
+
             let c2 = row_group.get_column_reader(1).unwrap();
             let mut c2 = get_typed_column_reader::<Int32Type>(c2);
             c2.read_records(4, None, None, &mut out).unwrap();
diff --git a/parquet/src/record/triplet.rs b/parquet/src/record/triplet.rs
index 7647b23e28..902641c08b 100644
--- a/parquet/src/record/triplet.rs
+++ b/parquet/src/record/triplet.rs
@@ -296,17 +296,19 @@ impl<T: DataType> TypedTripletIter<T> {
         // and therefore not advance `self.triplets_left`
         while self.curr_triplet_index >= self.triplets_left {
             let (records_read, values_read, levels_read) = {
-                // Get slice of definition levels, if available
-                let def_levels = self.def_levels.as_mut().map(|vec| &mut 
vec[..]);
-
-                // Get slice of repetition levels, if available
-                let rep_levels = self.rep_levels.as_mut().map(|vec| &mut 
vec[..]);
+                self.values.clear();
+                if let Some(x) = &mut self.def_levels {
+                    x.clear()
+                }
+                if let Some(x) = &mut self.rep_levels {
+                    x.clear()
+                }
 
                 // Buffer triplets
                 self.reader.read_records(
                     self.batch_size,
-                    def_levels,
-                    rep_levels,
+                    self.def_levels.as_mut(),
+                    self.rep_levels.as_mut(),
                     &mut self.values,
                 )?
             };
@@ -333,6 +335,7 @@ impl<T: DataType> TypedTripletIter<T> {
                 // Note: if values_read == 0, then spacing will not be 
triggered
                 let mut idx = values_read;
                 let def_levels = self.def_levels.as_ref().unwrap();
+                self.values.resize(levels_read, T::T::default());
                 for i in 0..levels_read {
                     if def_levels[levels_read - i - 1] == self.max_def_level {
                         idx -= 1; // This is done to avoid usize becoming a 
negative value
diff --git a/parquet_derive/src/parquet_field.rs 
b/parquet_derive/src/parquet_field.rs
index bb33b31968..8d759d11c4 100644
--- a/parquet_derive/src/parquet_field.rs
+++ b/parquet_derive/src/parquet_field.rs
@@ -243,15 +243,12 @@ impl Field {
     pub fn reader_snippet(&self) -> proc_macro2::TokenStream {
         let ident = &self.ident;
         let column_reader = self.ty.column_reader();
-        let parquet_type = self.ty.physical_type_as_rust();
 
         // generate the code to read the column into a vector `vals`
         let write_batch_expr = quote! {
-            let mut vals_vec = Vec::new();
-            vals_vec.resize(num_records, Default::default());
-            let mut vals: &mut [#parquet_type] = vals_vec.as_mut_slice();
+            let mut vals = Vec::new();
             if let #column_reader(mut typed) = column_reader {
-                typed.read_records(num_records, None, None, vals)?;
+                typed.read_records(num_records, None, None, &mut vals)?;
             } else {
                 panic!("Schema and struct disagree on type for {}", 
stringify!{#ident});
             }
@@ -646,23 +643,6 @@ impl Type {
         }
     }
 
-    fn physical_type_as_rust(&self) -> proc_macro2::TokenStream {
-        use parquet::basic::Type as BasicType;
-
-        match self.physical_type() {
-            BasicType::BOOLEAN => quote! { bool },
-            BasicType::INT32 => quote! { i32 },
-            BasicType::INT64 => quote! { i64 },
-            BasicType::INT96 => unimplemented!("96-bit int currently is not 
supported"),
-            BasicType::FLOAT => quote! { f32 },
-            BasicType::DOUBLE => quote! { f64 },
-            BasicType::BYTE_ARRAY => quote! { ::parquet::data_type::ByteArray 
},
-            BasicType::FIXED_LEN_BYTE_ARRAY => {
-                quote! { ::parquet::data_type::FixedLenByteArray }
-            }
-        }
-    }
-
     fn logical_type(&self) -> proc_macro2::TokenStream {
         let last_part = self.last_part();
         let leaf_type = self.leaf_type_recursive();
@@ -877,11 +857,9 @@ mod test {
             snippet,
             (quote! {
                  {
-                     let mut vals_vec = Vec::new();
-                     vals_vec.resize(num_records, Default::default());
-                     let mut vals: &mut[i64] = vals_vec.as_mut_slice();
+                     let mut vals = Vec::new();
                      if let ColumnReader::Int64ColumnReader(mut typed) = 
column_reader {
-                         typed.read_records(num_records, None, None, vals)?;
+                         typed.read_records(num_records, None, None, &mut 
vals)?;
                      } else {
                          panic!("Schema and struct disagree on type for {}", 
stringify!{ counter });
                      }
@@ -1256,11 +1234,9 @@ mod test {
         let when = Field::from(&fields[0]);
         assert_eq!(when.reader_snippet().to_string(),(quote!{
             {
-                let mut vals_vec = Vec::new();
-                vals_vec.resize(num_records, Default::default());
-                let mut vals: &mut[i64] = vals_vec.as_mut_slice();
+                let mut vals = Vec::new();
                 if let ColumnReader::Int64ColumnReader(mut typed) = 
column_reader {
-                    typed.read_records(num_records, None, None, vals)?;
+                    typed.read_records(num_records, None, None, &mut vals)?;
                 } else {
                     panic!("Schema and struct disagree on type for {}", 
stringify!{ henceforth });
                 }
@@ -1326,11 +1302,9 @@ mod test {
         let when = Field::from(&fields[0]);
         assert_eq!(when.reader_snippet().to_string(),(quote!{
             {
-                let mut vals_vec = Vec::new();
-                vals_vec.resize(num_records, Default::default());
-                let mut vals: &mut [i32] = vals_vec.as_mut_slice();
+                let mut vals = Vec::new();
                 if let ColumnReader::Int32ColumnReader(mut typed) = 
column_reader {
-                    typed.read_records(num_records, None, None, vals)?;
+                    typed.read_records(num_records, None, None, &mut vals)?;
                 } else {
                     panic!("Schema and struct disagree on type for {}", 
stringify!{ henceforth });
                 }
@@ -1396,11 +1370,9 @@ mod test {
         let when = Field::from(&fields[0]);
         assert_eq!(when.reader_snippet().to_string(),(quote!{
             {
-                let mut vals_vec = Vec::new();
-                vals_vec.resize(num_records, Default::default());
-                let mut vals: &mut [::parquet::data_type::ByteArray] = 
vals_vec.as_mut_slice();
+                let mut vals = Vec::new();
                 if let ColumnReader::ByteArrayColumnReader(mut typed) = 
column_reader {
-                    typed.read_records(num_records, None, None, vals)?;
+                    typed.read_records(num_records, None, None, &mut vals)?;
                 } else {
                     panic!("Schema and struct disagree on type for {}", 
stringify!{ unique_id });
                 }

Reply via email to