This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a213bc36f Remove ScalarBuffer from parquet (#1849) (#5177) (#5178)
2a213bc36f is described below

commit 2a213bc36fdbbe8a51d4307b3c55be856e810af4
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Fri Dec 8 14:53:44 2023 +0000

    Remove ScalarBuffer from parquet (#1849) (#5177) (#5178)
---
 parquet/src/arrow/array_reader/byte_array.rs       |  66 ++++------
 .../arrow/array_reader/byte_array_dictionary.rs    |  45 +++----
 .../src/arrow/array_reader/fixed_len_byte_array.rs |  53 +++-----
 parquet/src/arrow/array_reader/null_array.rs       |   9 +-
 parquet/src/arrow/array_reader/primitive_array.rs  | 120 ++++++++---------
 parquet/src/arrow/buffer/dictionary_buffer.rs      |  41 +++---
 parquet/src/arrow/buffer/offset_buffer.rs          |  37 +++---
 parquet/src/arrow/record_reader/buffer.rs          | 144 +++------------------
 .../src/arrow/record_reader/definition_levels.rs   |  12 +-
 parquet/src/arrow/record_reader/mod.rs             |  48 +++----
 10 files changed, 200 insertions(+), 375 deletions(-)

diff --git a/parquet/src/arrow/array_reader/byte_array.rs 
b/parquet/src/arrow/array_reader/byte_array.rs
index 01666c0af4..debe0d6109 100644
--- a/parquet/src/arrow/array_reader/byte_array.rs
+++ b/parquet/src/arrow/array_reader/byte_array.rs
@@ -19,7 +19,6 @@ use crate::arrow::array_reader::{read_records, skip_records, 
ArrayReader};
 use crate::arrow::buffer::bit_util::sign_extend_be;
 use crate::arrow::buffer::offset_buffer::OffsetBuffer;
 use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder};
-use crate::arrow::record_reader::buffer::ScalarValue;
 use crate::arrow::record_reader::GenericRecordReader;
 use crate::arrow::schema::parquet_to_arrow_field;
 use crate::basic::{ConvertedType, Encoding};
@@ -77,7 +76,7 @@ pub fn make_byte_array_reader(
 }
 
 /// An [`ArrayReader`] for variable length byte arrays
-struct ByteArrayReader<I: ScalarValue> {
+struct ByteArrayReader<I: OffsetSizeTrait> {
     data_type: ArrowType,
     pages: Box<dyn PageIterator>,
     def_levels_buffer: Option<Buffer>,
@@ -85,14 +84,11 @@ struct ByteArrayReader<I: ScalarValue> {
     record_reader: GenericRecordReader<OffsetBuffer<I>, 
ByteArrayColumnValueDecoder<I>>,
 }
 
-impl<I: ScalarValue> ByteArrayReader<I> {
+impl<I: OffsetSizeTrait> ByteArrayReader<I> {
     fn new(
         pages: Box<dyn PageIterator>,
         data_type: ArrowType,
-        record_reader: GenericRecordReader<
-            OffsetBuffer<I>,
-            ByteArrayColumnValueDecoder<I>,
-        >,
+        record_reader: GenericRecordReader<OffsetBuffer<I>, 
ByteArrayColumnValueDecoder<I>>,
     ) -> Self {
         Self {
             data_type,
@@ -104,7 +100,7 @@ impl<I: ScalarValue> ByteArrayReader<I> {
     }
 }
 
-impl<I: OffsetSizeTrait + ScalarValue> ArrayReader for ByteArrayReader<I> {
+impl<I: OffsetSizeTrait> ArrayReader for ByteArrayReader<I> {
     fn as_any(&self) -> &dyn Any {
         self
     }
@@ -167,15 +163,13 @@ impl<I: OffsetSizeTrait + ScalarValue> ArrayReader for 
ByteArrayReader<I> {
 }
 
 /// A [`ColumnValueDecoder`] for variable length byte arrays
-struct ByteArrayColumnValueDecoder<I: ScalarValue> {
+struct ByteArrayColumnValueDecoder<I: OffsetSizeTrait> {
     dict: Option<OffsetBuffer<I>>,
     decoder: Option<ByteArrayDecoder>,
     validate_utf8: bool,
 }
 
-impl<I: OffsetSizeTrait + ScalarValue> ColumnValueDecoder
-    for ByteArrayColumnValueDecoder<I>
-{
+impl<I: OffsetSizeTrait> ColumnValueDecoder for ByteArrayColumnValueDecoder<I> 
{
     type Slice = OffsetBuffer<I>;
 
     fn new(desc: &ColumnDescPtr) -> Self {
@@ -275,17 +269,15 @@ impl ByteArrayDecoder {
                 num_values,
                 validate_utf8,
             )),
-            Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
-                ByteArrayDecoder::Dictionary(ByteArrayDecoderDictionary::new(
-                    data, num_levels, num_values,
-                ))
-            }
+            Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => 
ByteArrayDecoder::Dictionary(
+                ByteArrayDecoderDictionary::new(data, num_levels, num_values),
+            ),
             Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteArrayDecoder::DeltaLength(
                 ByteArrayDecoderDeltaLength::new(data, validate_utf8)?,
             ),
-            Encoding::DELTA_BYTE_ARRAY => ByteArrayDecoder::DeltaByteArray(
-                ByteArrayDecoderDelta::new(data, validate_utf8)?,
-            ),
+            Encoding::DELTA_BYTE_ARRAY => {
+                
ByteArrayDecoder::DeltaByteArray(ByteArrayDecoderDelta::new(data, 
validate_utf8)?)
+            }
             _ => {
                 return Err(general_err!(
                     "unsupported encoding for byte array: {}",
@@ -298,7 +290,7 @@ impl ByteArrayDecoder {
     }
 
     /// Read up to `len` values to `out` with the optional dictionary
-    pub fn read<I: OffsetSizeTrait + ScalarValue>(
+    pub fn read<I: OffsetSizeTrait>(
         &mut self,
         out: &mut OffsetBuffer<I>,
         len: usize,
@@ -307,8 +299,8 @@ impl ByteArrayDecoder {
         match self {
             ByteArrayDecoder::Plain(d) => d.read(out, len),
             ByteArrayDecoder::Dictionary(d) => {
-                let dict = dict
-                    .ok_or_else(|| general_err!("missing dictionary page for 
column"))?;
+                let dict =
+                    dict.ok_or_else(|| general_err!("missing dictionary page 
for column"))?;
 
                 d.read(out, dict, len)
             }
@@ -318,7 +310,7 @@ impl ByteArrayDecoder {
     }
 
     /// Skip `len` values
-    pub fn skip<I: OffsetSizeTrait + ScalarValue>(
+    pub fn skip<I: OffsetSizeTrait>(
         &mut self,
         len: usize,
         dict: Option<&OffsetBuffer<I>>,
@@ -326,8 +318,8 @@ impl ByteArrayDecoder {
         match self {
             ByteArrayDecoder::Plain(d) => d.skip(len),
             ByteArrayDecoder::Dictionary(d) => {
-                let dict = dict
-                    .ok_or_else(|| general_err!("missing dictionary page for 
column"))?;
+                let dict =
+                    dict.ok_or_else(|| general_err!("missing dictionary page 
for column"))?;
 
                 d.skip(dict, len)
             }
@@ -363,7 +355,7 @@ impl ByteArrayDecoderPlain {
         }
     }
 
-    pub fn read<I: OffsetSizeTrait + ScalarValue>(
+    pub fn read<I: OffsetSizeTrait>(
         &mut self,
         output: &mut OffsetBuffer<I>,
         len: usize,
@@ -392,8 +384,7 @@ impl ByteArrayDecoderPlain {
             if self.offset + 4 > buf.len() {
                 return Err(ParquetError::EOF("eof decoding byte 
array".into()));
             }
-            let len_bytes: [u8; 4] =
-                buf[self.offset..self.offset + 4].try_into().unwrap();
+            let len_bytes: [u8; 4] = buf[self.offset..self.offset + 
4].try_into().unwrap();
             let len = u32::from_le_bytes(len_bytes);
 
             let start_offset = self.offset + 4;
@@ -424,8 +415,7 @@ impl ByteArrayDecoderPlain {
             if self.offset + 4 > buf.len() {
                 return Err(ParquetError::EOF("eof decoding byte 
array".into()));
             }
-            let len_bytes: [u8; 4] =
-                buf[self.offset..self.offset + 4].try_into().unwrap();
+            let len_bytes: [u8; 4] = buf[self.offset..self.offset + 
4].try_into().unwrap();
             let len = u32::from_le_bytes(len_bytes) as usize;
             skip += 1;
             self.offset = self.offset + 4 + len;
@@ -462,7 +452,7 @@ impl ByteArrayDecoderDeltaLength {
         })
     }
 
-    fn read<I: OffsetSizeTrait + ScalarValue>(
+    fn read<I: OffsetSizeTrait>(
         &mut self,
         output: &mut OffsetBuffer<I>,
         len: usize,
@@ -529,7 +519,7 @@ impl ByteArrayDecoderDelta {
         })
     }
 
-    fn read<I: OffsetSizeTrait + ScalarValue>(
+    fn read<I: OffsetSizeTrait>(
         &mut self,
         output: &mut OffsetBuffer<I>,
         len: usize,
@@ -564,7 +554,7 @@ impl ByteArrayDecoderDictionary {
         }
     }
 
-    fn read<I: OffsetSizeTrait + ScalarValue>(
+    fn read<I: OffsetSizeTrait>(
         &mut self,
         output: &mut OffsetBuffer<I>,
         dict: &OffsetBuffer<I>,
@@ -576,15 +566,11 @@ impl ByteArrayDecoderDictionary {
         }
 
         self.decoder.read(len, |keys| {
-            output.extend_from_dictionary(
-                keys,
-                dict.offsets.as_slice(),
-                dict.values.as_slice(),
-            )
+            output.extend_from_dictionary(keys, dict.offsets.as_slice(), 
dict.values.as_slice())
         })
     }
 
-    fn skip<I: OffsetSizeTrait + ScalarValue>(
+    fn skip<I: OffsetSizeTrait>(
         &mut self,
         dict: &OffsetBuffer<I>,
         to_skip: usize,
diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs 
b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
index 0d216fa083..a381223541 100644
--- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs
+++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
@@ -27,10 +27,8 @@ use bytes::Bytes;
 
 use crate::arrow::array_reader::byte_array::{ByteArrayDecoder, 
ByteArrayDecoderPlain};
 use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
-use crate::arrow::buffer::{
-    dictionary_buffer::DictionaryBuffer, offset_buffer::OffsetBuffer,
-};
-use crate::arrow::record_reader::buffer::{BufferQueue, ScalarValue};
+use crate::arrow::buffer::{dictionary_buffer::DictionaryBuffer, 
offset_buffer::OffsetBuffer};
+use crate::arrow::record_reader::buffer::BufferQueue;
 use crate::arrow::record_reader::GenericRecordReader;
 use crate::arrow::schema::parquet_to_arrow_field;
 use crate::basic::{ConvertedType, Encoding};
@@ -123,7 +121,7 @@ pub fn make_byte_array_dictionary_reader(
 /// An [`ArrayReader`] for dictionary encoded variable length byte arrays
 ///
 /// Will attempt to preserve any dictionary encoding present in the parquet 
data
-struct ByteArrayDictionaryReader<K: ScalarValue, V: ScalarValue> {
+struct ByteArrayDictionaryReader<K: ArrowNativeType, V: OffsetSizeTrait> {
     data_type: ArrowType,
     pages: Box<dyn PageIterator>,
     def_levels_buffer: Option<Buffer>,
@@ -133,16 +131,13 @@ struct ByteArrayDictionaryReader<K: ScalarValue, V: 
ScalarValue> {
 
 impl<K, V> ByteArrayDictionaryReader<K, V>
 where
-    K: FromBytes + ScalarValue + Ord + ArrowNativeType,
-    V: ScalarValue + OffsetSizeTrait,
+    K: FromBytes + Ord + ArrowNativeType,
+    V: OffsetSizeTrait,
 {
     fn new(
         pages: Box<dyn PageIterator>,
         data_type: ArrowType,
-        record_reader: GenericRecordReader<
-            DictionaryBuffer<K, V>,
-            DictionaryDecoder<K, V>,
-        >,
+        record_reader: GenericRecordReader<DictionaryBuffer<K, V>, 
DictionaryDecoder<K, V>>,
     ) -> Self {
         Self {
             data_type,
@@ -156,8 +151,8 @@ where
 
 impl<K, V> ArrayReader for ByteArrayDictionaryReader<K, V>
 where
-    K: FromBytes + ScalarValue + Ord + ArrowNativeType,
-    V: ScalarValue + OffsetSizeTrait,
+    K: FromBytes + Ord + ArrowNativeType,
+    V: OffsetSizeTrait,
 {
     fn as_any(&self) -> &dyn Any {
         self
@@ -226,16 +221,15 @@ struct DictionaryDecoder<K, V> {
 
 impl<K, V> ColumnValueDecoder for DictionaryDecoder<K, V>
 where
-    K: FromBytes + ScalarValue + Ord + ArrowNativeType,
-    V: ScalarValue + OffsetSizeTrait,
+    K: FromBytes + Ord + ArrowNativeType,
+    V: OffsetSizeTrait,
 {
     type Slice = DictionaryBuffer<K, V>;
 
     fn new(col: &ColumnDescPtr) -> Self {
         let validate_utf8 = col.converted_type() == ConvertedType::UTF8;
 
-        let value_type = match (V::IS_LARGE, col.converted_type() == 
ConvertedType::UTF8)
-        {
+        let value_type = match (V::IS_LARGE, col.converted_type() == 
ConvertedType::UTF8) {
             (true, true) => ArrowType::LargeUtf8,
             (true, false) => ArrowType::LargeBinary,
             (false, true) => ArrowType::Utf8,
@@ -274,8 +268,7 @@ where
 
         let len = num_values as usize;
         let mut buffer = OffsetBuffer::<V>::default();
-        let mut decoder =
-            ByteArrayDecoderPlain::new(buf, len, Some(len), 
self.validate_utf8);
+        let mut decoder = ByteArrayDecoderPlain::new(buf, len, Some(len), 
self.validate_utf8);
         decoder.read(&mut buffer, usize::MAX)?;
 
         let array = buffer.into_array(None, self.value_type.clone());
@@ -339,8 +332,8 @@ where
                     Some(keys) => {
                         // Happy path - can just copy keys
                         // Keys will be validated on conversion to arrow
-                        let keys_slice = keys.spare_capacity_mut(range.start + 
len);
-                        let len = decoder.get_batch(&mut 
keys_slice[range.start..])?;
+                        let keys_slice = keys.get_output_slice(len);
+                        let len = decoder.get_batch(keys_slice)?;
                         *max_remaining_values -= len;
                         Ok(len)
                     }
@@ -360,11 +353,7 @@ where
                         let dict_offsets = dict_buffers[0].typed_data::<V>();
                         let dict_values = dict_buffers[1].as_slice();
 
-                        values.extend_from_dictionary(
-                            &keys[..len],
-                            dict_offsets,
-                            dict_values,
-                        )?;
+                        values.extend_from_dictionary(&keys[..len], 
dict_offsets, dict_values)?;
                         *max_remaining_values -= len;
                         Ok(len)
                     }
@@ -375,9 +364,7 @@ where
 
     fn skip_values(&mut self, num_values: usize) -> Result<usize> {
         match self.decoder.as_mut().expect("decoder set") {
-            MaybeDictionaryDecoder::Fallback(decoder) => {
-                decoder.skip::<V>(num_values, None)
-            }
+            MaybeDictionaryDecoder::Fallback(decoder) => 
decoder.skip::<V>(num_values, None),
             MaybeDictionaryDecoder::Dict {
                 decoder,
                 max_remaining_values,
diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs 
b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
index b846997d36..849aa37c56 100644
--- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
+++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
@@ -18,7 +18,7 @@
 use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
 use crate::arrow::buffer::bit_util::{iter_set_bits_rev, sign_extend_be};
 use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder};
-use crate::arrow::record_reader::buffer::{BufferQueue, ScalarBuffer, 
ValuesBuffer};
+use crate::arrow::record_reader::buffer::{BufferQueue, ValuesBuffer};
 use crate::arrow::record_reader::GenericRecordReader;
 use crate::arrow::schema::parquet_to_arrow_field;
 use crate::basic::{Encoding, Type};
@@ -162,11 +162,10 @@ impl ArrayReader for FixedLenByteArrayReader {
     fn consume_batch(&mut self) -> Result<ArrayRef> {
         let record_data = self.record_reader.consume_record_data();
 
-        let array_data =
-            ArrayDataBuilder::new(ArrowType::FixedSizeBinary(self.byte_length 
as i32))
-                .len(self.record_reader.num_values())
-                .add_buffer(record_data)
-                .null_bit_buffer(self.record_reader.consume_bitmap_buffer());
+        let array_data = 
ArrayDataBuilder::new(ArrowType::FixedSizeBinary(self.byte_length as i32))
+            .len(self.record_reader.num_values())
+            .add_buffer(record_data)
+            .null_bit_buffer(self.record_reader.consume_bitmap_buffer());
 
         let binary = FixedSizeBinaryArray::from(unsafe { 
array_data.build_unchecked() });
 
@@ -197,19 +196,13 @@ impl ArrayReader for FixedLenByteArrayReader {
                     IntervalUnit::YearMonth => Arc::new(
                         binary
                             .iter()
-                            .map(|o| {
-                                o.map(|b| 
i32::from_le_bytes(b[0..4].try_into().unwrap()))
-                            })
+                            .map(|o| o.map(|b| 
i32::from_le_bytes(b[0..4].try_into().unwrap())))
                             .collect::<IntervalYearMonthArray>(),
                     ) as ArrayRef,
                     IntervalUnit::DayTime => Arc::new(
                         binary
                             .iter()
-                            .map(|o| {
-                                o.map(|b| {
-                                    
i64::from_le_bytes(b[4..12].try_into().unwrap())
-                                })
-                            })
+                            .map(|o| o.map(|b| 
i64::from_le_bytes(b[4..12].try_into().unwrap())))
                             .collect::<IntervalDayTimeArray>(),
                     ) as ArrayRef,
                     IntervalUnit::MonthDayNano => {
@@ -247,7 +240,7 @@ impl ArrayReader for FixedLenByteArrayReader {
 }
 
 struct FixedLenByteArrayBuffer {
-    buffer: ScalarBuffer<u8>,
+    buffer: Vec<u8>,
     /// The length of each element in bytes
     byte_length: usize,
 }
@@ -263,14 +256,14 @@ impl BufferQueue for FixedLenByteArrayBuffer {
     type Slice = Self;
 
     fn consume(&mut self) -> Self::Output {
-        self.buffer.consume()
+        Buffer::from_vec(self.buffer.consume())
     }
 
-    fn spare_capacity_mut(&mut self, _batch_size: usize) -> &mut Self::Slice {
+    fn get_output_slice(&mut self, _batch_size: usize) -> &mut Self::Slice {
         self
     }
 
-    fn set_len(&mut self, len: usize) {
+    fn truncate_buffer(&mut self, len: usize) {
         assert_eq!(self.buffer.len(), len * self.byte_length);
     }
 }
@@ -288,14 +281,10 @@ impl ValuesBuffer for FixedLenByteArrayBuffer {
             (read_offset + values_read) * self.byte_length
         );
         self.buffer
-            .resize((read_offset + levels_read) * self.byte_length);
-
-        let slice = self.buffer.as_slice_mut();
+            .resize((read_offset + levels_read) * self.byte_length, 0);
 
         let values_range = read_offset..read_offset + values_read;
-        for (value_pos, level_pos) in
-            values_range.rev().zip(iter_set_bits_rev(valid_mask))
-        {
+        for (value_pos, level_pos) in 
values_range.rev().zip(iter_set_bits_rev(valid_mask)) {
             debug_assert!(level_pos >= value_pos);
             if level_pos <= value_pos {
                 break;
@@ -305,7 +294,7 @@ impl ValuesBuffer for FixedLenByteArrayBuffer {
             let value_pos_bytes = value_pos * self.byte_length;
 
             for i in 0..self.byte_length {
-                slice[level_pos_bytes + i] = slice[value_pos_bytes + i]
+                self.buffer[level_pos_bytes + i] = self.buffer[value_pos_bytes 
+ i]
             }
         }
     }
@@ -391,8 +380,7 @@ impl ColumnValueDecoder for ValueDecoder {
         let len = range.end - range.start;
         match self.decoder.as_mut().unwrap() {
             Decoder::Plain { offset, buf } => {
-                let to_read =
-                    (len * self.byte_length).min(buf.len() - *offset) / 
self.byte_length;
+                let to_read = (len * self.byte_length).min(buf.len() - 
*offset) / self.byte_length;
                 let end_offset = *offset + to_read * self.byte_length;
                 out.buffer
                     .extend_from_slice(&buf.as_ref()[*offset..end_offset]);
@@ -485,15 +473,12 @@ mod tests {
         .build()
         .unwrap();
 
-        let written = RecordBatch::try_from_iter([(
-            "list",
-            Arc::new(ListArray::from(data)) as ArrayRef,
-        )])
-        .unwrap();
+        let written =
+            RecordBatch::try_from_iter([("list", 
Arc::new(ListArray::from(data)) as ArrayRef)])
+                .unwrap();
 
         let mut buffer = Vec::with_capacity(1024);
-        let mut writer =
-            ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
+        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), 
None).unwrap();
         writer.write(&written).unwrap();
         writer.close().unwrap();
 
diff --git a/parquet/src/arrow/array_reader/null_array.rs 
b/parquet/src/arrow/array_reader/null_array.rs
index 4ad6c97e2f..bb32fb307f 100644
--- a/parquet/src/arrow/array_reader/null_array.rs
+++ b/parquet/src/arrow/array_reader/null_array.rs
@@ -16,14 +16,13 @@
 // under the License.
 
 use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
-use crate::arrow::record_reader::buffer::ScalarValue;
 use crate::arrow::record_reader::RecordReader;
 use crate::column::page::PageIterator;
 use crate::data_type::DataType;
 use crate::errors::Result;
 use crate::schema::types::ColumnDescPtr;
 use arrow_array::ArrayRef;
-use arrow_buffer::Buffer;
+use arrow_buffer::{ArrowNativeType, Buffer};
 use arrow_schema::DataType as ArrowType;
 use std::any::Any;
 use std::sync::Arc;
@@ -33,7 +32,7 @@ use std::sync::Arc;
 pub struct NullArrayReader<T>
 where
     T: DataType,
-    T::T: ScalarValue,
+    T::T: ArrowNativeType,
 {
     data_type: ArrowType,
     pages: Box<dyn PageIterator>,
@@ -45,7 +44,7 @@ where
 impl<T> NullArrayReader<T>
 where
     T: DataType,
-    T::T: ScalarValue,
+    T::T: ArrowNativeType,
 {
     /// Construct null array reader.
     pub fn new(pages: Box<dyn PageIterator>, column_desc: ColumnDescPtr) -> 
Result<Self> {
@@ -65,7 +64,7 @@ where
 impl<T> ArrayReader for NullArrayReader<T>
 where
     T: DataType,
-    T::T: ScalarValue,
+    T::T: ArrowNativeType,
 {
     fn as_any(&self) -> &dyn Any {
         self
diff --git a/parquet/src/arrow/array_reader/primitive_array.rs 
b/parquet/src/arrow/array_reader/primitive_array.rs
index f833eccecb..507b6215ca 100644
--- a/parquet/src/arrow/array_reader/primitive_array.rs
+++ b/parquet/src/arrow/array_reader/primitive_array.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
-use crate::arrow::record_reader::buffer::ScalarValue;
 use crate::arrow::record_reader::RecordReader;
 use crate::arrow::schema::parquet_to_arrow_field;
 use crate::basic::Type as PhysicalType;
@@ -26,22 +25,55 @@ use crate::errors::{ParquetError, Result};
 use crate::schema::types::ColumnDescPtr;
 use arrow_array::Decimal256Array;
 use arrow_array::{
-    builder::{BooleanBufferBuilder, TimestampNanosecondBufferBuilder},
-    ArrayRef, BooleanArray, Decimal128Array, Float32Array, Float64Array, 
Int32Array,
-    Int64Array, TimestampNanosecondArray, UInt32Array, UInt64Array,
+    builder::TimestampNanosecondBufferBuilder, ArrayRef, BooleanArray, 
Decimal128Array,
+    Float32Array, Float64Array, Int32Array, Int64Array, 
TimestampNanosecondArray, UInt32Array,
+    UInt64Array,
 };
-use arrow_buffer::{i256, Buffer};
+use arrow_buffer::{i256, BooleanBuffer, Buffer};
 use arrow_data::ArrayDataBuilder;
 use arrow_schema::{DataType as ArrowType, TimeUnit};
 use std::any::Any;
 use std::sync::Arc;
 
+/// Provides conversion from `Vec<T>` to `Buffer`
+pub trait IntoBuffer {
+    fn into_buffer(self) -> Buffer;
+}
+
+macro_rules! native_buffer {
+    ($($t:ty),*) => {
+        $(impl IntoBuffer for Vec<$t> {
+            fn into_buffer(self) -> Buffer {
+                Buffer::from_vec(self)
+            }
+        })*
+    };
+}
+native_buffer!(i8, i16, i32, i64, u8, u16, u32, u64, f32, f64);
+
+impl IntoBuffer for Vec<bool> {
+    fn into_buffer(self) -> Buffer {
+        BooleanBuffer::from_iter(self).into_inner()
+    }
+}
+
+impl IntoBuffer for Vec<Int96> {
+    fn into_buffer(self) -> Buffer {
+        let mut builder = TimestampNanosecondBufferBuilder::new(self.len());
+        for v in self {
+            builder.append(v.to_nanos())
+        }
+        builder.finish()
+    }
+}
+
 /// Primitive array readers are leaves of array reader tree. They accept page 
iterator
 /// and read them into primitive arrays.
 pub struct PrimitiveArrayReader<T>
 where
     T: DataType,
-    T::T: ScalarValue,
+    T::T: Copy + Default,
+    Vec<T::T>: IntoBuffer,
 {
     data_type: ArrowType,
     pages: Box<dyn PageIterator>,
@@ -53,7 +85,8 @@ where
 impl<T> PrimitiveArrayReader<T>
 where
     T: DataType,
-    T::T: ScalarValue,
+    T::T: Copy + Default,
+    Vec<T::T>: IntoBuffer,
 {
     /// Construct primitive array reader.
     pub fn new(
@@ -85,7 +118,8 @@ where
 impl<T> ArrayReader for PrimitiveArrayReader<T>
 where
     T: DataType,
-    T::T: ScalarValue,
+    T::T: Copy + Default,
+    Vec<T::T>: IntoBuffer,
 {
     fn as_any(&self) -> &dyn Any {
         self
@@ -131,40 +165,14 @@ where
                 _ => unreachable!("INT96 must be timestamp nanosecond"),
             },
             PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
-                unreachable!(
-                    "PrimitiveArrayReaders don't support complex physical 
types"
-                );
+                unreachable!("PrimitiveArrayReaders don't support complex 
physical types");
             }
         };
 
         // Convert to arrays by using the Parquet physical type.
         // The physical types are then cast to Arrow types if necessary
 
-        let record_data = self.record_reader.consume_record_data();
-        let record_data = match T::get_physical_type() {
-            PhysicalType::BOOLEAN => {
-                let mut boolean_buffer = 
BooleanBufferBuilder::new(record_data.len());
-
-                for e in record_data.as_slice() {
-                    boolean_buffer.append(*e > 0);
-                }
-                boolean_buffer.into()
-            }
-            PhysicalType::INT96 => {
-                // SAFETY - record_data is an aligned buffer of Int96
-                let (prefix, slice, suffix) =
-                    unsafe { record_data.as_slice().align_to::<Int96>() };
-                assert!(prefix.is_empty() && suffix.is_empty());
-
-                let mut builder = 
TimestampNanosecondBufferBuilder::new(slice.len());
-                for v in slice {
-                    builder.append(v.to_nanos())
-                }
-
-                builder.finish()
-            }
-            _ => record_data,
-        };
+        let record_data = 
self.record_reader.consume_record_data().into_buffer();
 
         let array_data = ArrayDataBuilder::new(arrow_data_type)
             .len(self.record_reader.num_values())
@@ -188,9 +196,7 @@ where
             PhysicalType::DOUBLE => Arc::new(Float64Array::from(array_data)),
             PhysicalType::INT96 => 
Arc::new(TimestampNanosecondArray::from(array_data)),
             PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
-                unreachable!(
-                    "PrimitiveArrayReaders don't support complex physical 
types"
-                );
+                unreachable!("PrimitiveArrayReaders don't support complex 
physical types");
             }
         };
 
@@ -409,12 +415,9 @@ mod tests {
             );
             let page_iterator = InMemoryPageIterator::new(page_lists);
 
-            let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
-                Box::new(page_iterator),
-                column_desc,
-                None,
-            )
-            .unwrap();
+            let mut array_reader =
+                
PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, 
None)
+                    .unwrap();
 
             // Read first 50 values, which are all from the first column chunk
             let array = array_reader.next_batch(50).unwrap();
@@ -618,12 +621,9 @@ mod tests {
 
             let page_iterator = InMemoryPageIterator::new(page_lists);
 
-            let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
-                Box::new(page_iterator),
-                column_desc,
-                None,
-            )
-            .unwrap();
+            let mut array_reader =
+                
PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, 
None)
+                    .unwrap();
 
             let mut accu_len: usize = 0;
 
@@ -697,12 +697,9 @@ mod tests {
             );
             let page_iterator = InMemoryPageIterator::new(page_lists);
 
-            let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
-                Box::new(page_iterator),
-                column_desc,
-                None,
-            )
-            .unwrap();
+            let mut array_reader =
+                
PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, 
None)
+                    .unwrap();
 
             // read data from the reader
             // the data type is decimal(8,2)
@@ -759,12 +756,9 @@ mod tests {
             );
             let page_iterator = InMemoryPageIterator::new(page_lists);
 
-            let mut array_reader = PrimitiveArrayReader::<Int64Type>::new(
-                Box::new(page_iterator),
-                column_desc,
-                None,
-            )
-            .unwrap();
+            let mut array_reader =
+                
PrimitiveArrayReader::<Int64Type>::new(Box::new(page_iterator), column_desc, 
None)
+                    .unwrap();
 
             // read data from the reader
             // the data type is decimal(18,4)
diff --git a/parquet/src/arrow/buffer/dictionary_buffer.rs 
b/parquet/src/arrow/buffer/dictionary_buffer.rs
index 4208318122..d0f63024ed 100644
--- a/parquet/src/arrow/buffer/dictionary_buffer.rs
+++ b/parquet/src/arrow/buffer/dictionary_buffer.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 use crate::arrow::buffer::offset_buffer::OffsetBuffer;
-use crate::arrow::record_reader::buffer::{BufferQueue, ScalarBuffer, 
ScalarValue, ValuesBuffer};
+use crate::arrow::record_reader::buffer::{BufferQueue, ValuesBuffer};
 use crate::column::reader::decoder::ValuesBufferSlice;
 use crate::errors::{ParquetError, Result};
 use arrow_array::{make_array, Array, ArrayRef, OffsetSizeTrait};
@@ -27,17 +27,12 @@ use std::sync::Arc;
 
 /// An array of variable length byte arrays that are potentially dictionary 
encoded
 /// and can be converted into a corresponding [`ArrayRef`]
-pub enum DictionaryBuffer<K: ScalarValue, V: ScalarValue> {
-    Dict {
-        keys: ScalarBuffer<K>,
-        values: ArrayRef,
-    },
-    Values {
-        values: OffsetBuffer<V>,
-    },
+pub enum DictionaryBuffer<K: ArrowNativeType, V: OffsetSizeTrait> {
+    Dict { keys: Vec<K>, values: ArrayRef },
+    Values { values: OffsetBuffer<V> },
 }
 
-impl<K: ScalarValue, V: ScalarValue> Default for DictionaryBuffer<K, V> {
+impl<K: ArrowNativeType, V: OffsetSizeTrait> Default for DictionaryBuffer<K, 
V> {
     fn default() -> Self {
         Self::Values {
             values: Default::default(),
@@ -45,9 +40,7 @@ impl<K: ScalarValue, V: ScalarValue> Default for 
DictionaryBuffer<K, V> {
     }
 }
 
-impl<K: ScalarValue + ArrowNativeType + Ord, V: ScalarValue + OffsetSizeTrait>
-    DictionaryBuffer<K, V>
-{
+impl<K: ArrowNativeType + Ord, V: OffsetSizeTrait> DictionaryBuffer<K, V> {
     #[allow(unused)]
     pub fn len(&self) -> usize {
         match self {
@@ -63,7 +56,7 @@ impl<K: ScalarValue + ArrowNativeType + Ord, V: ScalarValue + 
OffsetSizeTrait>
     /// # Panic
     ///
     /// Panics if the dictionary is too large for `K`
-    pub fn as_keys(&mut self, dictionary: &ArrayRef) -> Option<&mut 
ScalarBuffer<K>> {
+    pub fn as_keys(&mut self, dictionary: &ArrayRef) -> Option<&mut Vec<K>> {
         assert!(K::from_usize(dictionary.len()).is_some());
 
         match self {
@@ -112,7 +105,7 @@ impl<K: ScalarValue + ArrowNativeType + Ord, V: ScalarValue 
+ OffsetSizeTrait>
 
                 if values.is_empty() {
                     // If dictionary is empty, zero pad offsets
-                    spilled.offsets.resize(keys.len() + 1);
+                    spilled.offsets.resize(keys.len() + 1, V::default());
                 } else {
                     // Note: at this point null positions will have arbitrary 
dictionary keys
                     // and this will hydrate them to the corresponding byte 
array. This is
@@ -164,7 +157,7 @@ impl<K: ScalarValue + ArrowNativeType + Ord, V: ScalarValue 
+ OffsetSizeTrait>
 
                 let builder = ArrayDataBuilder::new(data_type.clone())
                     .len(keys.len())
-                    .add_buffer(keys.into())
+                    .add_buffer(Buffer::from_vec(keys))
                     .add_child_data(values.into_data())
                     .null_bit_buffer(null_buffer);
 
@@ -192,13 +185,13 @@ impl<K: ScalarValue + ArrowNativeType + Ord, V: 
ScalarValue + OffsetSizeTrait>
     }
 }
 
-impl<K: ScalarValue, V: ScalarValue> ValuesBufferSlice for DictionaryBuffer<K, 
V> {
+impl<K: ArrowNativeType, V: OffsetSizeTrait> ValuesBufferSlice for 
DictionaryBuffer<K, V> {
     fn capacity(&self) -> usize {
         usize::MAX
     }
 }
 
-impl<K: ScalarValue, V: ScalarValue + OffsetSizeTrait> ValuesBuffer for 
DictionaryBuffer<K, V> {
+impl<K: ArrowNativeType, V: OffsetSizeTrait> ValuesBuffer for 
DictionaryBuffer<K, V> {
     fn pad_nulls(
         &mut self,
         read_offset: usize,
@@ -208,7 +201,7 @@ impl<K: ScalarValue, V: ScalarValue + OffsetSizeTrait> 
ValuesBuffer for Dictiona
     ) {
         match self {
             Self::Dict { keys, .. } => {
-                keys.resize(read_offset + levels_read);
+                keys.resize(read_offset + levels_read, K::default());
                 keys.pad_nulls(read_offset, values_read, levels_read, 
valid_mask)
             }
             Self::Values { values, .. } => {
@@ -218,7 +211,7 @@ impl<K: ScalarValue, V: ScalarValue + OffsetSizeTrait> 
ValuesBuffer for Dictiona
     }
 }
 
-impl<K: ScalarValue, V: ScalarValue + OffsetSizeTrait> BufferQueue for 
DictionaryBuffer<K, V> {
+impl<K: ArrowNativeType, V: OffsetSizeTrait> BufferQueue for 
DictionaryBuffer<K, V> {
     type Output = Self;
     type Slice = Self;
 
@@ -234,14 +227,14 @@ impl<K: ScalarValue, V: ScalarValue + OffsetSizeTrait> 
BufferQueue for Dictionar
         }
     }
 
-    fn spare_capacity_mut(&mut self, _batch_size: usize) -> &mut Self::Slice {
+    fn get_output_slice(&mut self, _batch_size: usize) -> &mut Self::Slice {
         self
     }
 
-    fn set_len(&mut self, len: usize) {
+    fn truncate_buffer(&mut self, len: usize) {
         match self {
-            Self::Dict { keys, .. } => keys.set_len(len),
-            Self::Values { values } => values.set_len(len),
+            Self::Dict { keys, .. } => keys.truncate_buffer(len),
+            Self::Values { values } => values.truncate_buffer(len),
         }
     }
 }
diff --git a/parquet/src/arrow/buffer/offset_buffer.rs 
b/parquet/src/arrow/buffer/offset_buffer.rs
index 3f8f85494f..459c94ed28 100644
--- a/parquet/src/arrow/buffer/offset_buffer.rs
+++ b/parquet/src/arrow/buffer/offset_buffer.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 use crate::arrow::buffer::bit_util::iter_set_bits_rev;
-use crate::arrow::record_reader::buffer::{BufferQueue, ScalarBuffer, 
ScalarValue, ValuesBuffer};
+use crate::arrow::record_reader::buffer::{BufferQueue, ValuesBuffer};
 use crate::column::reader::decoder::ValuesBufferSlice;
 use crate::errors::{ParquetError, Result};
 use arrow_array::{make_array, ArrayRef, OffsetSizeTrait};
@@ -27,23 +27,23 @@ use arrow_schema::DataType as ArrowType;
 /// A buffer of variable-sized byte arrays that can be converted into
 /// a corresponding [`ArrayRef`]
 #[derive(Debug)]
-pub struct OffsetBuffer<I: ScalarValue> {
-    pub offsets: ScalarBuffer<I>,
-    pub values: ScalarBuffer<u8>,
+pub struct OffsetBuffer<I: OffsetSizeTrait> {
+    pub offsets: Vec<I>,
+    pub values: Vec<u8>,
 }
 
-impl<I: ScalarValue> Default for OffsetBuffer<I> {
+impl<I: OffsetSizeTrait> Default for OffsetBuffer<I> {
     fn default() -> Self {
-        let mut offsets = ScalarBuffer::new();
-        offsets.resize(1);
+        let mut offsets = Vec::new();
+        offsets.resize(1, I::default());
         Self {
             offsets,
-            values: ScalarBuffer::new(),
+            values: Vec::new(),
         }
     }
 }
 
-impl<I: OffsetSizeTrait + ScalarValue> OffsetBuffer<I> {
+impl<I: OffsetSizeTrait> OffsetBuffer<I> {
     /// Returns the number of byte arrays in this buffer
     pub fn len(&self) -> usize {
         self.offsets.len() - 1
@@ -128,8 +128,8 @@ impl<I: OffsetSizeTrait + ScalarValue> OffsetBuffer<I> {
     pub fn into_array(self, null_buffer: Option<Buffer>, data_type: ArrowType) 
-> ArrayRef {
         let array_data_builder = ArrayDataBuilder::new(data_type)
             .len(self.len())
-            .add_buffer(self.offsets.into())
-            .add_buffer(self.values.into())
+            .add_buffer(Buffer::from_vec(self.offsets))
+            .add_buffer(Buffer::from_vec(self.values))
             .null_bit_buffer(null_buffer);
 
         let data = match cfg!(debug_assertions) {
@@ -141,7 +141,7 @@ impl<I: OffsetSizeTrait + ScalarValue> OffsetBuffer<I> {
     }
 }
 
-impl<I: OffsetSizeTrait + ScalarValue> BufferQueue for OffsetBuffer<I> {
+impl<I: OffsetSizeTrait> BufferQueue for OffsetBuffer<I> {
     type Output = Self;
     type Slice = Self;
 
@@ -149,16 +149,16 @@ impl<I: OffsetSizeTrait + ScalarValue> BufferQueue for 
OffsetBuffer<I> {
         std::mem::take(self)
     }
 
-    fn spare_capacity_mut(&mut self, _batch_size: usize) -> &mut Self::Slice {
+    fn get_output_slice(&mut self, _batch_size: usize) -> &mut Self::Slice {
         self
     }
 
-    fn set_len(&mut self, len: usize) {
+    fn truncate_buffer(&mut self, len: usize) {
         assert_eq!(self.offsets.len(), len + 1);
     }
 }
 
-impl<I: OffsetSizeTrait + ScalarValue> ValuesBuffer for OffsetBuffer<I> {
+impl<I: OffsetSizeTrait> ValuesBuffer for OffsetBuffer<I> {
     fn pad_nulls(
         &mut self,
         read_offset: usize,
@@ -167,9 +167,10 @@ impl<I: OffsetSizeTrait + ScalarValue> ValuesBuffer for 
OffsetBuffer<I> {
         valid_mask: &[u8],
     ) {
         assert_eq!(self.offsets.len(), read_offset + values_read + 1);
-        self.offsets.resize(read_offset + levels_read + 1);
+        self.offsets
+            .resize(read_offset + levels_read + 1, I::default());
 
-        let offsets = self.offsets.as_slice_mut();
+        let offsets = &mut self.offsets;
 
         let mut last_pos = read_offset + levels_read + 1;
         let mut last_start_offset = I::from_usize(self.values.len()).unwrap();
@@ -207,7 +208,7 @@ impl<I: OffsetSizeTrait + ScalarValue> ValuesBuffer for 
OffsetBuffer<I> {
     }
 }
 
-impl<I: ScalarValue> ValuesBufferSlice for OffsetBuffer<I> {
+impl<I: OffsetSizeTrait> ValuesBufferSlice for OffsetBuffer<I> {
     fn capacity(&self) -> usize {
         usize::MAX
     }
diff --git a/parquet/src/arrow/record_reader/buffer.rs 
b/parquet/src/arrow/record_reader/buffer.rs
index 35a322e6c7..3914710ff7 100644
--- a/parquet/src/arrow/record_reader/buffer.rs
+++ b/parquet/src/arrow/record_reader/buffer.rs
@@ -15,11 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::marker::PhantomData;
-
 use crate::arrow::buffer::bit_util::iter_set_bits_rev;
-use crate::data_type::Int96;
-use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer};
 
 /// A buffer that supports writing new data to the end, and removing data from 
the front
 ///
@@ -37,12 +33,12 @@ pub trait BufferQueue: Sized {
     /// to append data to the end of this [`BufferQueue`]
     ///
     /// NB: writes to the returned slice will not update the length of 
[`BufferQueue`]
-    /// instead a subsequent call should be made to [`BufferQueue::set_len`]
-    fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice;
+    /// instead a subsequent call should be made to 
[`BufferQueue::truncate_buffer`]
+    fn get_output_slice(&mut self, batch_size: usize) -> &mut Self::Slice;
 
     /// Sets the length of the [`BufferQueue`].
     ///
-    /// Intended to be used in combination with 
[`BufferQueue::spare_capacity_mut`]
+    /// Intended to be used in combination with 
[`BufferQueue::get_output_slice`]
     ///
     /// # Panics
     ///
@@ -57,132 +53,27 @@ pub trait BufferQueue: Sized {
     /// track how much of this slice is actually written to by the caller. 
This is still
     /// safe as the slice is default-initialized.
     ///
-    fn set_len(&mut self, len: usize);
-}
-
-/// A marker trait for [scalar] types
-///
-/// This means that a `[Self::default()]` of length `len` can be safely 
created from a
-/// zero-initialized `[u8]` with length `len * std::mem::size_of::<Self>()` and
-/// alignment of `std::mem::size_of::<Self>()`
-///
-/// [scalar]: 
https://doc.rust-lang.org/book/ch03-02-data-types.html#scalar-types
-///
-pub trait ScalarValue: Copy {}
-impl ScalarValue for bool {}
-impl ScalarValue for u8 {}
-impl ScalarValue for i8 {}
-impl ScalarValue for u16 {}
-impl ScalarValue for i16 {}
-impl ScalarValue for u32 {}
-impl ScalarValue for i32 {}
-impl ScalarValue for u64 {}
-impl ScalarValue for i64 {}
-impl ScalarValue for f32 {}
-impl ScalarValue for f64 {}
-impl ScalarValue for Int96 {}
-
-/// A typed buffer similar to [`Vec<T>`] but using [`MutableBuffer`] for 
storage
-#[derive(Debug)]
-pub struct ScalarBuffer<T: ScalarValue> {
-    buffer: MutableBuffer,
-
-    /// Length in elements of size T
-    len: usize,
-
-    /// Placeholder to allow `T` as an invariant generic parameter
-    /// without making it !Send
-    _phantom: PhantomData<fn(T) -> T>,
-}
-
-impl<T: ScalarValue> Default for ScalarBuffer<T> {
-    fn default() -> Self {
-        Self::new()
-    }
-}
-
-impl<T: ScalarValue> ScalarBuffer<T> {
-    pub fn new() -> Self {
-        Self {
-            buffer: MutableBuffer::new(0),
-            len: 0,
-            _phantom: Default::default(),
-        }
-    }
-
-    pub fn len(&self) -> usize {
-        self.len
-    }
-
-    pub fn is_empty(&self) -> bool {
-        self.len == 0
-    }
-
-    pub fn reserve(&mut self, additional: usize) {
-        self.buffer.reserve(additional * std::mem::size_of::<T>());
-    }
-
-    pub fn resize(&mut self, len: usize) {
-        self.buffer.resize(len * std::mem::size_of::<T>(), 0);
-        self.len = len;
-    }
-
-    #[inline]
-    pub fn as_slice(&self) -> &[T] {
-        let (prefix, buf, suffix) = unsafe { 
self.buffer.as_slice().align_to::<T>() };
-        assert!(prefix.is_empty() && suffix.is_empty());
-        buf
-    }
-
-    #[inline]
-    pub fn as_slice_mut(&mut self) -> &mut [T] {
-        let (prefix, buf, suffix) = unsafe { 
self.buffer.as_slice_mut().align_to_mut::<T>() };
-        assert!(prefix.is_empty() && suffix.is_empty());
-        buf
-    }
+    fn truncate_buffer(&mut self, len: usize);
 }
 
-impl<T: ScalarValue + ArrowNativeType> ScalarBuffer<T> {
-    pub fn push(&mut self, v: T) {
-        self.buffer.push(v);
-        self.len += 1;
-    }
-
-    pub fn extend_from_slice(&mut self, v: &[T]) {
-        self.buffer.extend_from_slice(v);
-        self.len += v.len();
-    }
-}
-
-impl<T: ScalarValue> From<ScalarBuffer<T>> for Buffer {
-    fn from(t: ScalarBuffer<T>) -> Self {
-        t.buffer.into()
-    }
-}
-
-impl<T: ScalarValue> BufferQueue for ScalarBuffer<T> {
-    type Output = Buffer;
+impl<T: Copy + Default> BufferQueue for Vec<T> {
+    type Output = Self;
 
     type Slice = [T];
 
     fn consume(&mut self) -> Self::Output {
-        std::mem::take(self).into()
+        std::mem::take(self)
     }
 
-    fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice {
-        self.buffer
-            .resize((self.len + batch_size) * std::mem::size_of::<T>(), 0);
-
-        let range = self.len..self.len + batch_size;
-        &mut self.as_slice_mut()[range]
+    fn get_output_slice(&mut self, batch_size: usize) -> &mut Self::Slice {
+        let len = self.len();
+        self.resize(len + batch_size, T::default());
+        &mut self[len..]
     }
 
-    fn set_len(&mut self, len: usize) {
-        self.len = len;
-
-        let new_bytes = self.len * std::mem::size_of::<T>();
-        assert!(new_bytes <= self.buffer.len());
-        self.buffer.resize(new_bytes, 0);
+    fn truncate_buffer(&mut self, len: usize) {
+        assert!(len <= self.len());
+        self.truncate(len)
     }
 }
 
@@ -212,7 +103,7 @@ pub trait ValuesBuffer: BufferQueue {
     );
 }
 
-impl<T: ScalarValue> ValuesBuffer for ScalarBuffer<T> {
+impl<T: Copy + Default> ValuesBuffer for Vec<T> {
     fn pad_nulls(
         &mut self,
         read_offset: usize,
@@ -220,8 +111,7 @@ impl<T: ScalarValue> ValuesBuffer for ScalarBuffer<T> {
         levels_read: usize,
         valid_mask: &[u8],
     ) {
-        let slice = self.as_slice_mut();
-        assert!(slice.len() >= read_offset + levels_read);
+        assert!(self.len() >= read_offset + levels_read);
 
         let values_range = read_offset..read_offset + values_read;
         for (value_pos, level_pos) in 
values_range.rev().zip(iter_set_bits_rev(valid_mask)) {
@@ -229,7 +119,7 @@ impl<T: ScalarValue> ValuesBuffer for ScalarBuffer<T> {
             if level_pos <= value_pos {
                 break;
             }
-            slice[level_pos] = slice[value_pos];
+            self[level_pos] = self[value_pos];
         }
     }
 }
diff --git a/parquet/src/arrow/record_reader/definition_levels.rs 
b/parquet/src/arrow/record_reader/definition_levels.rs
index 9009c596c4..fa041f5fdb 100644
--- a/parquet/src/arrow/record_reader/definition_levels.rs
+++ b/parquet/src/arrow/record_reader/definition_levels.rs
@@ -30,12 +30,10 @@ use crate::column::reader::decoder::{
 use crate::errors::{ParquetError, Result};
 use crate::schema::types::ColumnDescPtr;
 
-use super::buffer::ScalarBuffer;
-
 enum BufferInner {
     /// Compute levels and null mask
     Full {
-        levels: ScalarBuffer<i16>,
+        levels: Vec<i16>,
         nulls: BooleanBufferBuilder,
         max_level: i16,
     },
@@ -77,7 +75,7 @@ impl DefinitionLevelBuffer {
                 }
             }
             false => BufferInner::Full {
-                levels: ScalarBuffer::new(),
+                levels: Vec::new(),
                 nulls: BooleanBufferBuilder::new(0),
                 max_level: desc.max_def_level(),
             },
@@ -89,7 +87,7 @@ impl DefinitionLevelBuffer {
     /// Returns the built level data
     pub fn consume_levels(&mut self) -> Option<Buffer> {
         match &mut self.inner {
-            BufferInner::Full { levels, .. } => 
Some(std::mem::take(levels).into()),
+            BufferInner::Full { levels, .. } => 
Some(Buffer::from_vec(std::mem::take(levels))),
             BufferInner::Mask { .. } => None,
         }
     }
@@ -174,9 +172,9 @@ impl DefinitionLevelDecoder for 
DefinitionLevelBufferDecoder {
                 assert_eq!(self.max_level, *max_level);
                 assert_eq!(range.start + writer.len, nulls.len());
 
-                levels.resize(range.end + writer.len);
+                levels.resize(range.end + writer.len, 0);
 
-                let slice = &mut levels.as_slice_mut()[writer.len..];
+                let slice = &mut levels[writer.len..];
                 let levels_read = decoder.read_def_levels(slice, 
range.clone())?;
 
                 nulls.reserve(levels_read);
diff --git a/parquet/src/arrow/record_reader/mod.rs 
b/parquet/src/arrow/record_reader/mod.rs
index ea98234199..49c69c87e3 100644
--- a/parquet/src/arrow/record_reader/mod.rs
+++ b/parquet/src/arrow/record_reader/mod.rs
@@ -18,7 +18,7 @@
 use arrow_buffer::Buffer;
 
 use crate::arrow::record_reader::{
-    buffer::{BufferQueue, ScalarBuffer, ValuesBuffer},
+    buffer::{BufferQueue, ValuesBuffer},
     definition_levels::{DefinitionLevelBuffer, DefinitionLevelBufferDecoder},
 };
 use crate::column::reader::decoder::RepetitionLevelDecoderImpl;
@@ -37,8 +37,7 @@ pub(crate) mod buffer;
 mod definition_levels;
 
 /// A `RecordReader` is a stateful column reader that delimits semantic 
records.
-pub type RecordReader<T> =
-    GenericRecordReader<ScalarBuffer<<T as DataType>::T>, 
ColumnValueDecoderImpl<T>>;
+pub type RecordReader<T> = GenericRecordReader<Vec<<T as DataType>::T>, 
ColumnValueDecoderImpl<T>>;
 
 pub(crate) type ColumnReader<CV> =
     GenericColumnReader<RepetitionLevelDecoderImpl, 
DefinitionLevelBufferDecoder, CV>;
@@ -53,7 +52,7 @@ pub struct GenericRecordReader<V, CV> {
 
     values: V,
     def_levels: Option<DefinitionLevelBuffer>,
-    rep_levels: Option<ScalarBuffer<i16>>,
+    rep_levels: Option<Vec<i16>>,
     column_reader: Option<ColumnReader<CV>>,
     /// Number of buffered levels / null-padded values
     num_values: usize,
@@ -81,7 +80,7 @@ where
         let def_levels = (desc.max_def_level() > 0)
             .then(|| DefinitionLevelBuffer::new(&desc, 
packed_null_mask(&desc)));
 
-        let rep_levels = (desc.max_rep_level() > 0).then(ScalarBuffer::new);
+        let rep_levels = (desc.max_rep_level() > 0).then(Vec::new);
 
         Self {
             values: records,
@@ -174,7 +173,9 @@ where
     /// Return repetition level data.
     /// The side effect is similar to `consume_def_levels`.
     pub fn consume_rep_levels(&mut self) -> Option<Buffer> {
-        self.rep_levels.as_mut().map(|x| x.consume())
+        self.rep_levels
+            .as_mut()
+            .map(|x| Buffer::from_vec(x.consume()))
     }
 
     /// Returns currently stored buffer data.
@@ -209,9 +210,9 @@ where
         let rep_levels = self
             .rep_levels
             .as_mut()
-            .map(|levels| levels.spare_capacity_mut(batch_size));
+            .map(|levels| levels.get_output_slice(batch_size));
         let def_levels = self.def_levels.as_mut();
-        let values = self.values.spare_capacity_mut(batch_size);
+        let values = self.values.get_output_slice(batch_size);
 
         let (records_read, values_read, levels_read) = self
             .column_reader
@@ -234,9 +235,9 @@ where
 
         self.num_records += records_read;
         self.num_values += levels_read;
-        self.values.set_len(self.num_values);
+        self.values.truncate_buffer(self.num_values);
         if let Some(ref mut buf) = self.rep_levels {
-            buf.set_len(self.num_values)
+            buf.truncate_buffer(self.num_values)
         };
         if let Some(ref mut buf) = self.def_levels {
             buf.set_len(self.num_values)
@@ -257,7 +258,7 @@ mod tests {
     use std::sync::Arc;
 
     use arrow::buffer::Buffer;
-    use arrow_array::builder::{Int16BufferBuilder, Int32BufferBuilder};
+    use arrow_array::builder::Int16BufferBuilder;
 
     use crate::basic::Encoding;
     use crate::data_type::Int32Type;
@@ -334,10 +335,7 @@ mod tests {
             assert_eq!(7, record_reader.num_values());
         }
 
-        let mut bb = Int32BufferBuilder::new(7);
-        bb.append_slice(&[4, 7, 6, 3, 2, 8, 9]);
-        let expected_buffer = bb.finish();
-        assert_eq!(expected_buffer, record_reader.consume_record_data());
+        assert_eq!(record_reader.consume_record_data(), &[4, 7, 6, 3, 2, 8, 
9]);
         assert_eq!(None, record_reader.consume_def_levels());
         assert_eq!(None, record_reader.consume_bitmap());
     }
@@ -434,13 +432,12 @@ mod tests {
 
         // Verify result record data
         let actual = record_reader.consume_record_data();
-        let actual_values = actual.typed_data::<i32>();
 
         let expected = &[0, 7, 0, 6, 3, 0, 8];
-        assert_eq!(actual_values.len(), expected.len());
+        assert_eq!(actual.len(), expected.len());
 
         // Only validate valid values are equal
-        let iter = expected_valid.iter().zip(actual_values).zip(expected);
+        let iter = expected_valid.iter().zip(&actual).zip(expected);
         for ((valid, actual), expected) in iter {
             if *valid {
                 assert_eq!(actual, expected)
@@ -544,12 +541,11 @@ mod tests {
 
         // Verify result record data
         let actual = record_reader.consume_record_data();
-        let actual_values = actual.typed_data::<i32>();
         let expected = &[4, 0, 0, 7, 6, 3, 2, 8, 9];
-        assert_eq!(actual_values.len(), expected.len());
+        assert_eq!(actual.len(), expected.len());
 
         // Only validate valid values are equal
-        let iter = expected_valid.iter().zip(actual_values).zip(expected);
+        let iter = expected_valid.iter().zip(&actual).zip(expected);
         for ((valid, actual), expected) in iter {
             if *valid {
                 assert_eq!(actual, expected)
@@ -713,10 +709,7 @@ mod tests {
             assert_eq!(0, record_reader.read_records(10).unwrap());
         }
 
-        let mut bb = Int32BufferBuilder::new(3);
-        bb.append_slice(&[6, 3, 2]);
-        let expected_buffer = bb.finish();
-        assert_eq!(expected_buffer, record_reader.consume_record_data());
+        assert_eq!(record_reader.consume_record_data(), &[6, 3, 2]);
         assert_eq!(None, record_reader.consume_def_levels());
         assert_eq!(None, record_reader.consume_bitmap());
     }
@@ -814,13 +807,12 @@ mod tests {
 
         // Verify result record data
         let actual = record_reader.consume_record_data();
-        let actual_values = actual.typed_data::<i32>();
 
         let expected = &[0, 6, 3];
-        assert_eq!(actual_values.len(), expected.len());
+        assert_eq!(actual.len(), expected.len());
 
         // Only validate valid values are equal
-        let iter = expected_valid.iter().zip(actual_values).zip(expected);
+        let iter = expected_valid.iter().zip(&actual).zip(expected);
         for ((valid, actual), expected) in iter {
             if *valid {
                 assert_eq!(actual, expected)

Reply via email to