This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 2a213bc36f Remove ScalarBuffer from parquet (#1849) (#5177) (#5178)
2a213bc36f is described below
commit 2a213bc36fdbbe8a51d4307b3c55be856e810af4
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Fri Dec 8 14:53:44 2023 +0000
Remove ScalarBuffer from parquet (#1849) (#5177) (#5178)
---
parquet/src/arrow/array_reader/byte_array.rs | 66 ++++------
.../arrow/array_reader/byte_array_dictionary.rs | 45 +++----
.../src/arrow/array_reader/fixed_len_byte_array.rs | 53 +++-----
parquet/src/arrow/array_reader/null_array.rs | 9 +-
parquet/src/arrow/array_reader/primitive_array.rs | 120 ++++++++---------
parquet/src/arrow/buffer/dictionary_buffer.rs | 41 +++---
parquet/src/arrow/buffer/offset_buffer.rs | 37 +++---
parquet/src/arrow/record_reader/buffer.rs | 144 +++------------------
.../src/arrow/record_reader/definition_levels.rs | 12 +-
parquet/src/arrow/record_reader/mod.rs | 48 +++----
10 files changed, 200 insertions(+), 375 deletions(-)
diff --git a/parquet/src/arrow/array_reader/byte_array.rs
b/parquet/src/arrow/array_reader/byte_array.rs
index 01666c0af4..debe0d6109 100644
--- a/parquet/src/arrow/array_reader/byte_array.rs
+++ b/parquet/src/arrow/array_reader/byte_array.rs
@@ -19,7 +19,6 @@ use crate::arrow::array_reader::{read_records, skip_records,
ArrayReader};
use crate::arrow::buffer::bit_util::sign_extend_be;
use crate::arrow::buffer::offset_buffer::OffsetBuffer;
use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder};
-use crate::arrow::record_reader::buffer::ScalarValue;
use crate::arrow::record_reader::GenericRecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
use crate::basic::{ConvertedType, Encoding};
@@ -77,7 +76,7 @@ pub fn make_byte_array_reader(
}
/// An [`ArrayReader`] for variable length byte arrays
-struct ByteArrayReader<I: ScalarValue> {
+struct ByteArrayReader<I: OffsetSizeTrait> {
data_type: ArrowType,
pages: Box<dyn PageIterator>,
def_levels_buffer: Option<Buffer>,
@@ -85,14 +84,11 @@ struct ByteArrayReader<I: ScalarValue> {
record_reader: GenericRecordReader<OffsetBuffer<I>,
ByteArrayColumnValueDecoder<I>>,
}
-impl<I: ScalarValue> ByteArrayReader<I> {
+impl<I: OffsetSizeTrait> ByteArrayReader<I> {
fn new(
pages: Box<dyn PageIterator>,
data_type: ArrowType,
- record_reader: GenericRecordReader<
- OffsetBuffer<I>,
- ByteArrayColumnValueDecoder<I>,
- >,
+ record_reader: GenericRecordReader<OffsetBuffer<I>,
ByteArrayColumnValueDecoder<I>>,
) -> Self {
Self {
data_type,
@@ -104,7 +100,7 @@ impl<I: ScalarValue> ByteArrayReader<I> {
}
}
-impl<I: OffsetSizeTrait + ScalarValue> ArrayReader for ByteArrayReader<I> {
+impl<I: OffsetSizeTrait> ArrayReader for ByteArrayReader<I> {
fn as_any(&self) -> &dyn Any {
self
}
@@ -167,15 +163,13 @@ impl<I: OffsetSizeTrait + ScalarValue> ArrayReader for
ByteArrayReader<I> {
}
/// A [`ColumnValueDecoder`] for variable length byte arrays
-struct ByteArrayColumnValueDecoder<I: ScalarValue> {
+struct ByteArrayColumnValueDecoder<I: OffsetSizeTrait> {
dict: Option<OffsetBuffer<I>>,
decoder: Option<ByteArrayDecoder>,
validate_utf8: bool,
}
-impl<I: OffsetSizeTrait + ScalarValue> ColumnValueDecoder
- for ByteArrayColumnValueDecoder<I>
-{
+impl<I: OffsetSizeTrait> ColumnValueDecoder for ByteArrayColumnValueDecoder<I>
{
type Slice = OffsetBuffer<I>;
fn new(desc: &ColumnDescPtr) -> Self {
@@ -275,17 +269,15 @@ impl ByteArrayDecoder {
num_values,
validate_utf8,
)),
- Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
- ByteArrayDecoder::Dictionary(ByteArrayDecoderDictionary::new(
- data, num_levels, num_values,
- ))
- }
+ Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY =>
ByteArrayDecoder::Dictionary(
+ ByteArrayDecoderDictionary::new(data, num_levels, num_values),
+ ),
Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteArrayDecoder::DeltaLength(
ByteArrayDecoderDeltaLength::new(data, validate_utf8)?,
),
- Encoding::DELTA_BYTE_ARRAY => ByteArrayDecoder::DeltaByteArray(
- ByteArrayDecoderDelta::new(data, validate_utf8)?,
- ),
+ Encoding::DELTA_BYTE_ARRAY => {
+
ByteArrayDecoder::DeltaByteArray(ByteArrayDecoderDelta::new(data,
validate_utf8)?)
+ }
_ => {
return Err(general_err!(
"unsupported encoding for byte array: {}",
@@ -298,7 +290,7 @@ impl ByteArrayDecoder {
}
/// Read up to `len` values to `out` with the optional dictionary
- pub fn read<I: OffsetSizeTrait + ScalarValue>(
+ pub fn read<I: OffsetSizeTrait>(
&mut self,
out: &mut OffsetBuffer<I>,
len: usize,
@@ -307,8 +299,8 @@ impl ByteArrayDecoder {
match self {
ByteArrayDecoder::Plain(d) => d.read(out, len),
ByteArrayDecoder::Dictionary(d) => {
- let dict = dict
- .ok_or_else(|| general_err!("missing dictionary page for
column"))?;
+ let dict =
+ dict.ok_or_else(|| general_err!("missing dictionary page
for column"))?;
d.read(out, dict, len)
}
@@ -318,7 +310,7 @@ impl ByteArrayDecoder {
}
/// Skip `len` values
- pub fn skip<I: OffsetSizeTrait + ScalarValue>(
+ pub fn skip<I: OffsetSizeTrait>(
&mut self,
len: usize,
dict: Option<&OffsetBuffer<I>>,
@@ -326,8 +318,8 @@ impl ByteArrayDecoder {
match self {
ByteArrayDecoder::Plain(d) => d.skip(len),
ByteArrayDecoder::Dictionary(d) => {
- let dict = dict
- .ok_or_else(|| general_err!("missing dictionary page for
column"))?;
+ let dict =
+ dict.ok_or_else(|| general_err!("missing dictionary page
for column"))?;
d.skip(dict, len)
}
@@ -363,7 +355,7 @@ impl ByteArrayDecoderPlain {
}
}
- pub fn read<I: OffsetSizeTrait + ScalarValue>(
+ pub fn read<I: OffsetSizeTrait>(
&mut self,
output: &mut OffsetBuffer<I>,
len: usize,
@@ -392,8 +384,7 @@ impl ByteArrayDecoderPlain {
if self.offset + 4 > buf.len() {
return Err(ParquetError::EOF("eof decoding byte
array".into()));
}
- let len_bytes: [u8; 4] =
- buf[self.offset..self.offset + 4].try_into().unwrap();
+ let len_bytes: [u8; 4] = buf[self.offset..self.offset +
4].try_into().unwrap();
let len = u32::from_le_bytes(len_bytes);
let start_offset = self.offset + 4;
@@ -424,8 +415,7 @@ impl ByteArrayDecoderPlain {
if self.offset + 4 > buf.len() {
return Err(ParquetError::EOF("eof decoding byte
array".into()));
}
- let len_bytes: [u8; 4] =
- buf[self.offset..self.offset + 4].try_into().unwrap();
+ let len_bytes: [u8; 4] = buf[self.offset..self.offset +
4].try_into().unwrap();
let len = u32::from_le_bytes(len_bytes) as usize;
skip += 1;
self.offset = self.offset + 4 + len;
@@ -462,7 +452,7 @@ impl ByteArrayDecoderDeltaLength {
})
}
- fn read<I: OffsetSizeTrait + ScalarValue>(
+ fn read<I: OffsetSizeTrait>(
&mut self,
output: &mut OffsetBuffer<I>,
len: usize,
@@ -529,7 +519,7 @@ impl ByteArrayDecoderDelta {
})
}
- fn read<I: OffsetSizeTrait + ScalarValue>(
+ fn read<I: OffsetSizeTrait>(
&mut self,
output: &mut OffsetBuffer<I>,
len: usize,
@@ -564,7 +554,7 @@ impl ByteArrayDecoderDictionary {
}
}
- fn read<I: OffsetSizeTrait + ScalarValue>(
+ fn read<I: OffsetSizeTrait>(
&mut self,
output: &mut OffsetBuffer<I>,
dict: &OffsetBuffer<I>,
@@ -576,15 +566,11 @@ impl ByteArrayDecoderDictionary {
}
self.decoder.read(len, |keys| {
- output.extend_from_dictionary(
- keys,
- dict.offsets.as_slice(),
- dict.values.as_slice(),
- )
+ output.extend_from_dictionary(keys, dict.offsets.as_slice(),
dict.values.as_slice())
})
}
- fn skip<I: OffsetSizeTrait + ScalarValue>(
+ fn skip<I: OffsetSizeTrait>(
&mut self,
dict: &OffsetBuffer<I>,
to_skip: usize,
diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs
b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
index 0d216fa083..a381223541 100644
--- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs
+++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
@@ -27,10 +27,8 @@ use bytes::Bytes;
use crate::arrow::array_reader::byte_array::{ByteArrayDecoder,
ByteArrayDecoderPlain};
use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
-use crate::arrow::buffer::{
- dictionary_buffer::DictionaryBuffer, offset_buffer::OffsetBuffer,
-};
-use crate::arrow::record_reader::buffer::{BufferQueue, ScalarValue};
+use crate::arrow::buffer::{dictionary_buffer::DictionaryBuffer,
offset_buffer::OffsetBuffer};
+use crate::arrow::record_reader::buffer::BufferQueue;
use crate::arrow::record_reader::GenericRecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
use crate::basic::{ConvertedType, Encoding};
@@ -123,7 +121,7 @@ pub fn make_byte_array_dictionary_reader(
/// An [`ArrayReader`] for dictionary encoded variable length byte arrays
///
/// Will attempt to preserve any dictionary encoding present in the parquet
data
-struct ByteArrayDictionaryReader<K: ScalarValue, V: ScalarValue> {
+struct ByteArrayDictionaryReader<K: ArrowNativeType, V: OffsetSizeTrait> {
data_type: ArrowType,
pages: Box<dyn PageIterator>,
def_levels_buffer: Option<Buffer>,
@@ -133,16 +131,13 @@ struct ByteArrayDictionaryReader<K: ScalarValue, V:
ScalarValue> {
impl<K, V> ByteArrayDictionaryReader<K, V>
where
- K: FromBytes + ScalarValue + Ord + ArrowNativeType,
- V: ScalarValue + OffsetSizeTrait,
+ K: FromBytes + Ord + ArrowNativeType,
+ V: OffsetSizeTrait,
{
fn new(
pages: Box<dyn PageIterator>,
data_type: ArrowType,
- record_reader: GenericRecordReader<
- DictionaryBuffer<K, V>,
- DictionaryDecoder<K, V>,
- >,
+ record_reader: GenericRecordReader<DictionaryBuffer<K, V>,
DictionaryDecoder<K, V>>,
) -> Self {
Self {
data_type,
@@ -156,8 +151,8 @@ where
impl<K, V> ArrayReader for ByteArrayDictionaryReader<K, V>
where
- K: FromBytes + ScalarValue + Ord + ArrowNativeType,
- V: ScalarValue + OffsetSizeTrait,
+ K: FromBytes + Ord + ArrowNativeType,
+ V: OffsetSizeTrait,
{
fn as_any(&self) -> &dyn Any {
self
@@ -226,16 +221,15 @@ struct DictionaryDecoder<K, V> {
impl<K, V> ColumnValueDecoder for DictionaryDecoder<K, V>
where
- K: FromBytes + ScalarValue + Ord + ArrowNativeType,
- V: ScalarValue + OffsetSizeTrait,
+ K: FromBytes + Ord + ArrowNativeType,
+ V: OffsetSizeTrait,
{
type Slice = DictionaryBuffer<K, V>;
fn new(col: &ColumnDescPtr) -> Self {
let validate_utf8 = col.converted_type() == ConvertedType::UTF8;
- let value_type = match (V::IS_LARGE, col.converted_type() ==
ConvertedType::UTF8)
- {
+ let value_type = match (V::IS_LARGE, col.converted_type() ==
ConvertedType::UTF8) {
(true, true) => ArrowType::LargeUtf8,
(true, false) => ArrowType::LargeBinary,
(false, true) => ArrowType::Utf8,
@@ -274,8 +268,7 @@ where
let len = num_values as usize;
let mut buffer = OffsetBuffer::<V>::default();
- let mut decoder =
- ByteArrayDecoderPlain::new(buf, len, Some(len),
self.validate_utf8);
+ let mut decoder = ByteArrayDecoderPlain::new(buf, len, Some(len),
self.validate_utf8);
decoder.read(&mut buffer, usize::MAX)?;
let array = buffer.into_array(None, self.value_type.clone());
@@ -339,8 +332,8 @@ where
Some(keys) => {
// Happy path - can just copy keys
// Keys will be validated on conversion to arrow
- let keys_slice = keys.spare_capacity_mut(range.start +
len);
- let len = decoder.get_batch(&mut
keys_slice[range.start..])?;
+ let keys_slice = keys.get_output_slice(len);
+ let len = decoder.get_batch(keys_slice)?;
*max_remaining_values -= len;
Ok(len)
}
@@ -360,11 +353,7 @@ where
let dict_offsets = dict_buffers[0].typed_data::<V>();
let dict_values = dict_buffers[1].as_slice();
- values.extend_from_dictionary(
- &keys[..len],
- dict_offsets,
- dict_values,
- )?;
+ values.extend_from_dictionary(&keys[..len],
dict_offsets, dict_values)?;
*max_remaining_values -= len;
Ok(len)
}
@@ -375,9 +364,7 @@ where
fn skip_values(&mut self, num_values: usize) -> Result<usize> {
match self.decoder.as_mut().expect("decoder set") {
- MaybeDictionaryDecoder::Fallback(decoder) => {
- decoder.skip::<V>(num_values, None)
- }
+ MaybeDictionaryDecoder::Fallback(decoder) =>
decoder.skip::<V>(num_values, None),
MaybeDictionaryDecoder::Dict {
decoder,
max_remaining_values,
diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
index b846997d36..849aa37c56 100644
--- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
+++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs
@@ -18,7 +18,7 @@
use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
use crate::arrow::buffer::bit_util::{iter_set_bits_rev, sign_extend_be};
use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder};
-use crate::arrow::record_reader::buffer::{BufferQueue, ScalarBuffer,
ValuesBuffer};
+use crate::arrow::record_reader::buffer::{BufferQueue, ValuesBuffer};
use crate::arrow::record_reader::GenericRecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
use crate::basic::{Encoding, Type};
@@ -162,11 +162,10 @@ impl ArrayReader for FixedLenByteArrayReader {
fn consume_batch(&mut self) -> Result<ArrayRef> {
let record_data = self.record_reader.consume_record_data();
- let array_data =
- ArrayDataBuilder::new(ArrowType::FixedSizeBinary(self.byte_length
as i32))
- .len(self.record_reader.num_values())
- .add_buffer(record_data)
- .null_bit_buffer(self.record_reader.consume_bitmap_buffer());
+ let array_data =
ArrayDataBuilder::new(ArrowType::FixedSizeBinary(self.byte_length as i32))
+ .len(self.record_reader.num_values())
+ .add_buffer(record_data)
+ .null_bit_buffer(self.record_reader.consume_bitmap_buffer());
let binary = FixedSizeBinaryArray::from(unsafe {
array_data.build_unchecked() });
@@ -197,19 +196,13 @@ impl ArrayReader for FixedLenByteArrayReader {
IntervalUnit::YearMonth => Arc::new(
binary
.iter()
- .map(|o| {
- o.map(|b|
i32::from_le_bytes(b[0..4].try_into().unwrap()))
- })
+ .map(|o| o.map(|b|
i32::from_le_bytes(b[0..4].try_into().unwrap())))
.collect::<IntervalYearMonthArray>(),
) as ArrayRef,
IntervalUnit::DayTime => Arc::new(
binary
.iter()
- .map(|o| {
- o.map(|b| {
-
i64::from_le_bytes(b[4..12].try_into().unwrap())
- })
- })
+ .map(|o| o.map(|b|
i64::from_le_bytes(b[4..12].try_into().unwrap())))
.collect::<IntervalDayTimeArray>(),
) as ArrayRef,
IntervalUnit::MonthDayNano => {
@@ -247,7 +240,7 @@ impl ArrayReader for FixedLenByteArrayReader {
}
struct FixedLenByteArrayBuffer {
- buffer: ScalarBuffer<u8>,
+ buffer: Vec<u8>,
/// The length of each element in bytes
byte_length: usize,
}
@@ -263,14 +256,14 @@ impl BufferQueue for FixedLenByteArrayBuffer {
type Slice = Self;
fn consume(&mut self) -> Self::Output {
- self.buffer.consume()
+ Buffer::from_vec(self.buffer.consume())
}
- fn spare_capacity_mut(&mut self, _batch_size: usize) -> &mut Self::Slice {
+ fn get_output_slice(&mut self, _batch_size: usize) -> &mut Self::Slice {
self
}
- fn set_len(&mut self, len: usize) {
+ fn truncate_buffer(&mut self, len: usize) {
assert_eq!(self.buffer.len(), len * self.byte_length);
}
}
@@ -288,14 +281,10 @@ impl ValuesBuffer for FixedLenByteArrayBuffer {
(read_offset + values_read) * self.byte_length
);
self.buffer
- .resize((read_offset + levels_read) * self.byte_length);
-
- let slice = self.buffer.as_slice_mut();
+ .resize((read_offset + levels_read) * self.byte_length, 0);
let values_range = read_offset..read_offset + values_read;
- for (value_pos, level_pos) in
- values_range.rev().zip(iter_set_bits_rev(valid_mask))
- {
+ for (value_pos, level_pos) in
values_range.rev().zip(iter_set_bits_rev(valid_mask)) {
debug_assert!(level_pos >= value_pos);
if level_pos <= value_pos {
break;
@@ -305,7 +294,7 @@ impl ValuesBuffer for FixedLenByteArrayBuffer {
let value_pos_bytes = value_pos * self.byte_length;
for i in 0..self.byte_length {
- slice[level_pos_bytes + i] = slice[value_pos_bytes + i]
+ self.buffer[level_pos_bytes + i] = self.buffer[value_pos_bytes
+ i]
}
}
}
@@ -391,8 +380,7 @@ impl ColumnValueDecoder for ValueDecoder {
let len = range.end - range.start;
match self.decoder.as_mut().unwrap() {
Decoder::Plain { offset, buf } => {
- let to_read =
- (len * self.byte_length).min(buf.len() - *offset) /
self.byte_length;
+ let to_read = (len * self.byte_length).min(buf.len() -
*offset) / self.byte_length;
let end_offset = *offset + to_read * self.byte_length;
out.buffer
.extend_from_slice(&buf.as_ref()[*offset..end_offset]);
@@ -485,15 +473,12 @@ mod tests {
.build()
.unwrap();
- let written = RecordBatch::try_from_iter([(
- "list",
- Arc::new(ListArray::from(data)) as ArrayRef,
- )])
- .unwrap();
+ let written =
+ RecordBatch::try_from_iter([("list",
Arc::new(ListArray::from(data)) as ArrayRef)])
+ .unwrap();
let mut buffer = Vec::with_capacity(1024);
- let mut writer =
- ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
+ let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(),
None).unwrap();
writer.write(&written).unwrap();
writer.close().unwrap();
diff --git a/parquet/src/arrow/array_reader/null_array.rs
b/parquet/src/arrow/array_reader/null_array.rs
index 4ad6c97e2f..bb32fb307f 100644
--- a/parquet/src/arrow/array_reader/null_array.rs
+++ b/parquet/src/arrow/array_reader/null_array.rs
@@ -16,14 +16,13 @@
// under the License.
use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
-use crate::arrow::record_reader::buffer::ScalarValue;
use crate::arrow::record_reader::RecordReader;
use crate::column::page::PageIterator;
use crate::data_type::DataType;
use crate::errors::Result;
use crate::schema::types::ColumnDescPtr;
use arrow_array::ArrayRef;
-use arrow_buffer::Buffer;
+use arrow_buffer::{ArrowNativeType, Buffer};
use arrow_schema::DataType as ArrowType;
use std::any::Any;
use std::sync::Arc;
@@ -33,7 +32,7 @@ use std::sync::Arc;
pub struct NullArrayReader<T>
where
T: DataType,
- T::T: ScalarValue,
+ T::T: ArrowNativeType,
{
data_type: ArrowType,
pages: Box<dyn PageIterator>,
@@ -45,7 +44,7 @@ where
impl<T> NullArrayReader<T>
where
T: DataType,
- T::T: ScalarValue,
+ T::T: ArrowNativeType,
{
/// Construct null array reader.
pub fn new(pages: Box<dyn PageIterator>, column_desc: ColumnDescPtr) ->
Result<Self> {
@@ -65,7 +64,7 @@ where
impl<T> ArrayReader for NullArrayReader<T>
where
T: DataType,
- T::T: ScalarValue,
+ T::T: ArrowNativeType,
{
fn as_any(&self) -> &dyn Any {
self
diff --git a/parquet/src/arrow/array_reader/primitive_array.rs
b/parquet/src/arrow/array_reader/primitive_array.rs
index f833eccecb..507b6215ca 100644
--- a/parquet/src/arrow/array_reader/primitive_array.rs
+++ b/parquet/src/arrow/array_reader/primitive_array.rs
@@ -16,7 +16,6 @@
// under the License.
use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
-use crate::arrow::record_reader::buffer::ScalarValue;
use crate::arrow::record_reader::RecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
use crate::basic::Type as PhysicalType;
@@ -26,22 +25,55 @@ use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use arrow_array::Decimal256Array;
use arrow_array::{
- builder::{BooleanBufferBuilder, TimestampNanosecondBufferBuilder},
- ArrayRef, BooleanArray, Decimal128Array, Float32Array, Float64Array,
Int32Array,
- Int64Array, TimestampNanosecondArray, UInt32Array, UInt64Array,
+ builder::TimestampNanosecondBufferBuilder, ArrayRef, BooleanArray,
Decimal128Array,
+ Float32Array, Float64Array, Int32Array, Int64Array,
TimestampNanosecondArray, UInt32Array,
+ UInt64Array,
};
-use arrow_buffer::{i256, Buffer};
+use arrow_buffer::{i256, BooleanBuffer, Buffer};
use arrow_data::ArrayDataBuilder;
use arrow_schema::{DataType as ArrowType, TimeUnit};
use std::any::Any;
use std::sync::Arc;
+/// Provides conversion from `Vec<T>` to `Buffer`
+pub trait IntoBuffer {
+ fn into_buffer(self) -> Buffer;
+}
+
+macro_rules! native_buffer {
+ ($($t:ty),*) => {
+ $(impl IntoBuffer for Vec<$t> {
+ fn into_buffer(self) -> Buffer {
+ Buffer::from_vec(self)
+ }
+ })*
+ };
+}
+native_buffer!(i8, i16, i32, i64, u8, u16, u32, u64, f32, f64);
+
+impl IntoBuffer for Vec<bool> {
+ fn into_buffer(self) -> Buffer {
+ BooleanBuffer::from_iter(self).into_inner()
+ }
+}
+
+impl IntoBuffer for Vec<Int96> {
+ fn into_buffer(self) -> Buffer {
+ let mut builder = TimestampNanosecondBufferBuilder::new(self.len());
+ for v in self {
+ builder.append(v.to_nanos())
+ }
+ builder.finish()
+ }
+}
+
/// Primitive array readers are leaves of array reader tree. They accept page
iterator
/// and read them into primitive arrays.
pub struct PrimitiveArrayReader<T>
where
T: DataType,
- T::T: ScalarValue,
+ T::T: Copy + Default,
+ Vec<T::T>: IntoBuffer,
{
data_type: ArrowType,
pages: Box<dyn PageIterator>,
@@ -53,7 +85,8 @@ where
impl<T> PrimitiveArrayReader<T>
where
T: DataType,
- T::T: ScalarValue,
+ T::T: Copy + Default,
+ Vec<T::T>: IntoBuffer,
{
/// Construct primitive array reader.
pub fn new(
@@ -85,7 +118,8 @@ where
impl<T> ArrayReader for PrimitiveArrayReader<T>
where
T: DataType,
- T::T: ScalarValue,
+ T::T: Copy + Default,
+ Vec<T::T>: IntoBuffer,
{
fn as_any(&self) -> &dyn Any {
self
@@ -131,40 +165,14 @@ where
_ => unreachable!("INT96 must be timestamp nanosecond"),
},
PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
- unreachable!(
- "PrimitiveArrayReaders don't support complex physical
types"
- );
+ unreachable!("PrimitiveArrayReaders don't support complex
physical types");
}
};
// Convert to arrays by using the Parquet physical type.
// The physical types are then cast to Arrow types if necessary
- let record_data = self.record_reader.consume_record_data();
- let record_data = match T::get_physical_type() {
- PhysicalType::BOOLEAN => {
- let mut boolean_buffer =
BooleanBufferBuilder::new(record_data.len());
-
- for e in record_data.as_slice() {
- boolean_buffer.append(*e > 0);
- }
- boolean_buffer.into()
- }
- PhysicalType::INT96 => {
- // SAFETY - record_data is an aligned buffer of Int96
- let (prefix, slice, suffix) =
- unsafe { record_data.as_slice().align_to::<Int96>() };
- assert!(prefix.is_empty() && suffix.is_empty());
-
- let mut builder =
TimestampNanosecondBufferBuilder::new(slice.len());
- for v in slice {
- builder.append(v.to_nanos())
- }
-
- builder.finish()
- }
- _ => record_data,
- };
+ let record_data =
self.record_reader.consume_record_data().into_buffer();
let array_data = ArrayDataBuilder::new(arrow_data_type)
.len(self.record_reader.num_values())
@@ -188,9 +196,7 @@ where
PhysicalType::DOUBLE => Arc::new(Float64Array::from(array_data)),
PhysicalType::INT96 =>
Arc::new(TimestampNanosecondArray::from(array_data)),
PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
- unreachable!(
- "PrimitiveArrayReaders don't support complex physical
types"
- );
+ unreachable!("PrimitiveArrayReaders don't support complex
physical types");
}
};
@@ -409,12 +415,9 @@ mod tests {
);
let page_iterator = InMemoryPageIterator::new(page_lists);
- let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
- Box::new(page_iterator),
- column_desc,
- None,
- )
- .unwrap();
+ let mut array_reader =
+
PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc,
None)
+ .unwrap();
// Read first 50 values, which are all from the first column chunk
let array = array_reader.next_batch(50).unwrap();
@@ -618,12 +621,9 @@ mod tests {
let page_iterator = InMemoryPageIterator::new(page_lists);
- let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
- Box::new(page_iterator),
- column_desc,
- None,
- )
- .unwrap();
+ let mut array_reader =
+
PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc,
None)
+ .unwrap();
let mut accu_len: usize = 0;
@@ -697,12 +697,9 @@ mod tests {
);
let page_iterator = InMemoryPageIterator::new(page_lists);
- let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
- Box::new(page_iterator),
- column_desc,
- None,
- )
- .unwrap();
+ let mut array_reader =
+
PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc,
None)
+ .unwrap();
// read data from the reader
// the data type is decimal(8,2)
@@ -759,12 +756,9 @@ mod tests {
);
let page_iterator = InMemoryPageIterator::new(page_lists);
- let mut array_reader = PrimitiveArrayReader::<Int64Type>::new(
- Box::new(page_iterator),
- column_desc,
- None,
- )
- .unwrap();
+ let mut array_reader =
+
PrimitiveArrayReader::<Int64Type>::new(Box::new(page_iterator), column_desc,
None)
+ .unwrap();
// read data from the reader
// the data type is decimal(18,4)
diff --git a/parquet/src/arrow/buffer/dictionary_buffer.rs
b/parquet/src/arrow/buffer/dictionary_buffer.rs
index 4208318122..d0f63024ed 100644
--- a/parquet/src/arrow/buffer/dictionary_buffer.rs
+++ b/parquet/src/arrow/buffer/dictionary_buffer.rs
@@ -16,7 +16,7 @@
// under the License.
use crate::arrow::buffer::offset_buffer::OffsetBuffer;
-use crate::arrow::record_reader::buffer::{BufferQueue, ScalarBuffer,
ScalarValue, ValuesBuffer};
+use crate::arrow::record_reader::buffer::{BufferQueue, ValuesBuffer};
use crate::column::reader::decoder::ValuesBufferSlice;
use crate::errors::{ParquetError, Result};
use arrow_array::{make_array, Array, ArrayRef, OffsetSizeTrait};
@@ -27,17 +27,12 @@ use std::sync::Arc;
/// An array of variable length byte arrays that are potentially dictionary
encoded
/// and can be converted into a corresponding [`ArrayRef`]
-pub enum DictionaryBuffer<K: ScalarValue, V: ScalarValue> {
- Dict {
- keys: ScalarBuffer<K>,
- values: ArrayRef,
- },
- Values {
- values: OffsetBuffer<V>,
- },
+pub enum DictionaryBuffer<K: ArrowNativeType, V: OffsetSizeTrait> {
+ Dict { keys: Vec<K>, values: ArrayRef },
+ Values { values: OffsetBuffer<V> },
}
-impl<K: ScalarValue, V: ScalarValue> Default for DictionaryBuffer<K, V> {
+impl<K: ArrowNativeType, V: OffsetSizeTrait> Default for DictionaryBuffer<K,
V> {
fn default() -> Self {
Self::Values {
values: Default::default(),
@@ -45,9 +40,7 @@ impl<K: ScalarValue, V: ScalarValue> Default for
DictionaryBuffer<K, V> {
}
}
-impl<K: ScalarValue + ArrowNativeType + Ord, V: ScalarValue + OffsetSizeTrait>
- DictionaryBuffer<K, V>
-{
+impl<K: ArrowNativeType + Ord, V: OffsetSizeTrait> DictionaryBuffer<K, V> {
#[allow(unused)]
pub fn len(&self) -> usize {
match self {
@@ -63,7 +56,7 @@ impl<K: ScalarValue + ArrowNativeType + Ord, V: ScalarValue +
OffsetSizeTrait>
/// # Panic
///
/// Panics if the dictionary is too large for `K`
- pub fn as_keys(&mut self, dictionary: &ArrayRef) -> Option<&mut
ScalarBuffer<K>> {
+ pub fn as_keys(&mut self, dictionary: &ArrayRef) -> Option<&mut Vec<K>> {
assert!(K::from_usize(dictionary.len()).is_some());
match self {
@@ -112,7 +105,7 @@ impl<K: ScalarValue + ArrowNativeType + Ord, V: ScalarValue
+ OffsetSizeTrait>
if values.is_empty() {
// If dictionary is empty, zero pad offsets
- spilled.offsets.resize(keys.len() + 1);
+ spilled.offsets.resize(keys.len() + 1, V::default());
} else {
// Note: at this point null positions will have arbitrary
dictionary keys
// and this will hydrate them to the corresponding byte
array. This is
@@ -164,7 +157,7 @@ impl<K: ScalarValue + ArrowNativeType + Ord, V: ScalarValue
+ OffsetSizeTrait>
let builder = ArrayDataBuilder::new(data_type.clone())
.len(keys.len())
- .add_buffer(keys.into())
+ .add_buffer(Buffer::from_vec(keys))
.add_child_data(values.into_data())
.null_bit_buffer(null_buffer);
@@ -192,13 +185,13 @@ impl<K: ScalarValue + ArrowNativeType + Ord, V:
ScalarValue + OffsetSizeTrait>
}
}
-impl<K: ScalarValue, V: ScalarValue> ValuesBufferSlice for DictionaryBuffer<K,
V> {
+impl<K: ArrowNativeType, V: OffsetSizeTrait> ValuesBufferSlice for
DictionaryBuffer<K, V> {
fn capacity(&self) -> usize {
usize::MAX
}
}
-impl<K: ScalarValue, V: ScalarValue + OffsetSizeTrait> ValuesBuffer for
DictionaryBuffer<K, V> {
+impl<K: ArrowNativeType, V: OffsetSizeTrait> ValuesBuffer for
DictionaryBuffer<K, V> {
fn pad_nulls(
&mut self,
read_offset: usize,
@@ -208,7 +201,7 @@ impl<K: ScalarValue, V: ScalarValue + OffsetSizeTrait>
ValuesBuffer for Dictiona
) {
match self {
Self::Dict { keys, .. } => {
- keys.resize(read_offset + levels_read);
+ keys.resize(read_offset + levels_read, K::default());
keys.pad_nulls(read_offset, values_read, levels_read,
valid_mask)
}
Self::Values { values, .. } => {
@@ -218,7 +211,7 @@ impl<K: ScalarValue, V: ScalarValue + OffsetSizeTrait>
ValuesBuffer for Dictiona
}
}
-impl<K: ScalarValue, V: ScalarValue + OffsetSizeTrait> BufferQueue for
DictionaryBuffer<K, V> {
+impl<K: ArrowNativeType, V: OffsetSizeTrait> BufferQueue for
DictionaryBuffer<K, V> {
type Output = Self;
type Slice = Self;
@@ -234,14 +227,14 @@ impl<K: ScalarValue, V: ScalarValue + OffsetSizeTrait>
BufferQueue for Dictionar
}
}
- fn spare_capacity_mut(&mut self, _batch_size: usize) -> &mut Self::Slice {
+ fn get_output_slice(&mut self, _batch_size: usize) -> &mut Self::Slice {
self
}
- fn set_len(&mut self, len: usize) {
+ fn truncate_buffer(&mut self, len: usize) {
match self {
- Self::Dict { keys, .. } => keys.set_len(len),
- Self::Values { values } => values.set_len(len),
+ Self::Dict { keys, .. } => keys.truncate_buffer(len),
+ Self::Values { values } => values.truncate_buffer(len),
}
}
}
diff --git a/parquet/src/arrow/buffer/offset_buffer.rs
b/parquet/src/arrow/buffer/offset_buffer.rs
index 3f8f85494f..459c94ed28 100644
--- a/parquet/src/arrow/buffer/offset_buffer.rs
+++ b/parquet/src/arrow/buffer/offset_buffer.rs
@@ -16,7 +16,7 @@
// under the License.
use crate::arrow::buffer::bit_util::iter_set_bits_rev;
-use crate::arrow::record_reader::buffer::{BufferQueue, ScalarBuffer,
ScalarValue, ValuesBuffer};
+use crate::arrow::record_reader::buffer::{BufferQueue, ValuesBuffer};
use crate::column::reader::decoder::ValuesBufferSlice;
use crate::errors::{ParquetError, Result};
use arrow_array::{make_array, ArrayRef, OffsetSizeTrait};
@@ -27,23 +27,23 @@ use arrow_schema::DataType as ArrowType;
/// A buffer of variable-sized byte arrays that can be converted into
/// a corresponding [`ArrayRef`]
#[derive(Debug)]
-pub struct OffsetBuffer<I: ScalarValue> {
- pub offsets: ScalarBuffer<I>,
- pub values: ScalarBuffer<u8>,
+pub struct OffsetBuffer<I: OffsetSizeTrait> {
+ pub offsets: Vec<I>,
+ pub values: Vec<u8>,
}
-impl<I: ScalarValue> Default for OffsetBuffer<I> {
+impl<I: OffsetSizeTrait> Default for OffsetBuffer<I> {
fn default() -> Self {
- let mut offsets = ScalarBuffer::new();
- offsets.resize(1);
+ let mut offsets = Vec::new();
+ offsets.resize(1, I::default());
Self {
offsets,
- values: ScalarBuffer::new(),
+ values: Vec::new(),
}
}
}
-impl<I: OffsetSizeTrait + ScalarValue> OffsetBuffer<I> {
+impl<I: OffsetSizeTrait> OffsetBuffer<I> {
/// Returns the number of byte arrays in this buffer
pub fn len(&self) -> usize {
self.offsets.len() - 1
@@ -128,8 +128,8 @@ impl<I: OffsetSizeTrait + ScalarValue> OffsetBuffer<I> {
pub fn into_array(self, null_buffer: Option<Buffer>, data_type: ArrowType)
-> ArrayRef {
let array_data_builder = ArrayDataBuilder::new(data_type)
.len(self.len())
- .add_buffer(self.offsets.into())
- .add_buffer(self.values.into())
+ .add_buffer(Buffer::from_vec(self.offsets))
+ .add_buffer(Buffer::from_vec(self.values))
.null_bit_buffer(null_buffer);
let data = match cfg!(debug_assertions) {
@@ -141,7 +141,7 @@ impl<I: OffsetSizeTrait + ScalarValue> OffsetBuffer<I> {
}
}
-impl<I: OffsetSizeTrait + ScalarValue> BufferQueue for OffsetBuffer<I> {
+impl<I: OffsetSizeTrait> BufferQueue for OffsetBuffer<I> {
type Output = Self;
type Slice = Self;
@@ -149,16 +149,16 @@ impl<I: OffsetSizeTrait + ScalarValue> BufferQueue for
OffsetBuffer<I> {
std::mem::take(self)
}
- fn spare_capacity_mut(&mut self, _batch_size: usize) -> &mut Self::Slice {
+ fn get_output_slice(&mut self, _batch_size: usize) -> &mut Self::Slice {
self
}
- fn set_len(&mut self, len: usize) {
+ fn truncate_buffer(&mut self, len: usize) {
assert_eq!(self.offsets.len(), len + 1);
}
}
-impl<I: OffsetSizeTrait + ScalarValue> ValuesBuffer for OffsetBuffer<I> {
+impl<I: OffsetSizeTrait> ValuesBuffer for OffsetBuffer<I> {
fn pad_nulls(
&mut self,
read_offset: usize,
@@ -167,9 +167,10 @@ impl<I: OffsetSizeTrait + ScalarValue> ValuesBuffer for
OffsetBuffer<I> {
valid_mask: &[u8],
) {
assert_eq!(self.offsets.len(), read_offset + values_read + 1);
- self.offsets.resize(read_offset + levels_read + 1);
+ self.offsets
+ .resize(read_offset + levels_read + 1, I::default());
- let offsets = self.offsets.as_slice_mut();
+ let offsets = &mut self.offsets;
let mut last_pos = read_offset + levels_read + 1;
let mut last_start_offset = I::from_usize(self.values.len()).unwrap();
@@ -207,7 +208,7 @@ impl<I: OffsetSizeTrait + ScalarValue> ValuesBuffer for
OffsetBuffer<I> {
}
}
-impl<I: ScalarValue> ValuesBufferSlice for OffsetBuffer<I> {
+impl<I: OffsetSizeTrait> ValuesBufferSlice for OffsetBuffer<I> {
fn capacity(&self) -> usize {
usize::MAX
}
diff --git a/parquet/src/arrow/record_reader/buffer.rs
b/parquet/src/arrow/record_reader/buffer.rs
index 35a322e6c7..3914710ff7 100644
--- a/parquet/src/arrow/record_reader/buffer.rs
+++ b/parquet/src/arrow/record_reader/buffer.rs
@@ -15,11 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use std::marker::PhantomData;
-
use crate::arrow::buffer::bit_util::iter_set_bits_rev;
-use crate::data_type::Int96;
-use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer};
/// A buffer that supports writing new data to the end, and removing data from
the front
///
@@ -37,12 +33,12 @@ pub trait BufferQueue: Sized {
/// to append data to the end of this [`BufferQueue`]
///
/// NB: writes to the returned slice will not update the length of
[`BufferQueue`]
- /// instead a subsequent call should be made to [`BufferQueue::set_len`]
- fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice;
+ /// instead a subsequent call should be made to
[`BufferQueue::truncate_buffer`]
+ fn get_output_slice(&mut self, batch_size: usize) -> &mut Self::Slice;
/// Sets the length of the [`BufferQueue`].
///
- /// Intended to be used in combination with
[`BufferQueue::spare_capacity_mut`]
+ /// Intended to be used in combination with
[`BufferQueue::get_output_slice`]
///
/// # Panics
///
@@ -57,132 +53,27 @@ pub trait BufferQueue: Sized {
/// track how much of this slice is actually written to by the caller.
This is still
/// safe as the slice is default-initialized.
///
- fn set_len(&mut self, len: usize);
-}
-
-/// A marker trait for [scalar] types
-///
-/// This means that a `[Self::default()]` of length `len` can be safely
created from a
-/// zero-initialized `[u8]` with length `len * std::mem::size_of::<Self>()` and
-/// alignment of `std::mem::size_of::<Self>()`
-///
-/// [scalar]:
https://doc.rust-lang.org/book/ch03-02-data-types.html#scalar-types
-///
-pub trait ScalarValue: Copy {}
-impl ScalarValue for bool {}
-impl ScalarValue for u8 {}
-impl ScalarValue for i8 {}
-impl ScalarValue for u16 {}
-impl ScalarValue for i16 {}
-impl ScalarValue for u32 {}
-impl ScalarValue for i32 {}
-impl ScalarValue for u64 {}
-impl ScalarValue for i64 {}
-impl ScalarValue for f32 {}
-impl ScalarValue for f64 {}
-impl ScalarValue for Int96 {}
-
-/// A typed buffer similar to [`Vec<T>`] but using [`MutableBuffer`] for
storage
-#[derive(Debug)]
-pub struct ScalarBuffer<T: ScalarValue> {
- buffer: MutableBuffer,
-
- /// Length in elements of size T
- len: usize,
-
- /// Placeholder to allow `T` as an invariant generic parameter
- /// without making it !Send
- _phantom: PhantomData<fn(T) -> T>,
-}
-
-impl<T: ScalarValue> Default for ScalarBuffer<T> {
- fn default() -> Self {
- Self::new()
- }
-}
-
-impl<T: ScalarValue> ScalarBuffer<T> {
- pub fn new() -> Self {
- Self {
- buffer: MutableBuffer::new(0),
- len: 0,
- _phantom: Default::default(),
- }
- }
-
- pub fn len(&self) -> usize {
- self.len
- }
-
- pub fn is_empty(&self) -> bool {
- self.len == 0
- }
-
- pub fn reserve(&mut self, additional: usize) {
- self.buffer.reserve(additional * std::mem::size_of::<T>());
- }
-
- pub fn resize(&mut self, len: usize) {
- self.buffer.resize(len * std::mem::size_of::<T>(), 0);
- self.len = len;
- }
-
- #[inline]
- pub fn as_slice(&self) -> &[T] {
- let (prefix, buf, suffix) = unsafe {
self.buffer.as_slice().align_to::<T>() };
- assert!(prefix.is_empty() && suffix.is_empty());
- buf
- }
-
- #[inline]
- pub fn as_slice_mut(&mut self) -> &mut [T] {
- let (prefix, buf, suffix) = unsafe {
self.buffer.as_slice_mut().align_to_mut::<T>() };
- assert!(prefix.is_empty() && suffix.is_empty());
- buf
- }
+ fn truncate_buffer(&mut self, len: usize);
}
-impl<T: ScalarValue + ArrowNativeType> ScalarBuffer<T> {
- pub fn push(&mut self, v: T) {
- self.buffer.push(v);
- self.len += 1;
- }
-
- pub fn extend_from_slice(&mut self, v: &[T]) {
- self.buffer.extend_from_slice(v);
- self.len += v.len();
- }
-}
-
-impl<T: ScalarValue> From<ScalarBuffer<T>> for Buffer {
- fn from(t: ScalarBuffer<T>) -> Self {
- t.buffer.into()
- }
-}
-
-impl<T: ScalarValue> BufferQueue for ScalarBuffer<T> {
- type Output = Buffer;
+impl<T: Copy + Default> BufferQueue for Vec<T> {
+ type Output = Self;
type Slice = [T];
fn consume(&mut self) -> Self::Output {
- std::mem::take(self).into()
+ std::mem::take(self)
}
- fn spare_capacity_mut(&mut self, batch_size: usize) -> &mut Self::Slice {
- self.buffer
- .resize((self.len + batch_size) * std::mem::size_of::<T>(), 0);
-
- let range = self.len..self.len + batch_size;
- &mut self.as_slice_mut()[range]
+ fn get_output_slice(&mut self, batch_size: usize) -> &mut Self::Slice {
+ let len = self.len();
+ self.resize(len + batch_size, T::default());
+ &mut self[len..]
}
- fn set_len(&mut self, len: usize) {
- self.len = len;
-
- let new_bytes = self.len * std::mem::size_of::<T>();
- assert!(new_bytes <= self.buffer.len());
- self.buffer.resize(new_bytes, 0);
+ fn truncate_buffer(&mut self, len: usize) {
+ assert!(len <= self.len());
+ self.truncate(len)
}
}
@@ -212,7 +103,7 @@ pub trait ValuesBuffer: BufferQueue {
);
}
-impl<T: ScalarValue> ValuesBuffer for ScalarBuffer<T> {
+impl<T: Copy + Default> ValuesBuffer for Vec<T> {
fn pad_nulls(
&mut self,
read_offset: usize,
@@ -220,8 +111,7 @@ impl<T: ScalarValue> ValuesBuffer for ScalarBuffer<T> {
levels_read: usize,
valid_mask: &[u8],
) {
- let slice = self.as_slice_mut();
- assert!(slice.len() >= read_offset + levels_read);
+ assert!(self.len() >= read_offset + levels_read);
let values_range = read_offset..read_offset + values_read;
for (value_pos, level_pos) in
values_range.rev().zip(iter_set_bits_rev(valid_mask)) {
@@ -229,7 +119,7 @@ impl<T: ScalarValue> ValuesBuffer for ScalarBuffer<T> {
if level_pos <= value_pos {
break;
}
- slice[level_pos] = slice[value_pos];
+ self[level_pos] = self[value_pos];
}
}
}
diff --git a/parquet/src/arrow/record_reader/definition_levels.rs
b/parquet/src/arrow/record_reader/definition_levels.rs
index 9009c596c4..fa041f5fdb 100644
--- a/parquet/src/arrow/record_reader/definition_levels.rs
+++ b/parquet/src/arrow/record_reader/definition_levels.rs
@@ -30,12 +30,10 @@ use crate::column::reader::decoder::{
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
-use super::buffer::ScalarBuffer;
-
enum BufferInner {
/// Compute levels and null mask
Full {
- levels: ScalarBuffer<i16>,
+ levels: Vec<i16>,
nulls: BooleanBufferBuilder,
max_level: i16,
},
@@ -77,7 +75,7 @@ impl DefinitionLevelBuffer {
}
}
false => BufferInner::Full {
- levels: ScalarBuffer::new(),
+ levels: Vec::new(),
nulls: BooleanBufferBuilder::new(0),
max_level: desc.max_def_level(),
},
@@ -89,7 +87,7 @@ impl DefinitionLevelBuffer {
/// Returns the built level data
pub fn consume_levels(&mut self) -> Option<Buffer> {
match &mut self.inner {
- BufferInner::Full { levels, .. } =>
Some(std::mem::take(levels).into()),
+ BufferInner::Full { levels, .. } =>
Some(Buffer::from_vec(std::mem::take(levels))),
BufferInner::Mask { .. } => None,
}
}
@@ -174,9 +172,9 @@ impl DefinitionLevelDecoder for
DefinitionLevelBufferDecoder {
assert_eq!(self.max_level, *max_level);
assert_eq!(range.start + writer.len, nulls.len());
- levels.resize(range.end + writer.len);
+ levels.resize(range.end + writer.len, 0);
- let slice = &mut levels.as_slice_mut()[writer.len..];
+ let slice = &mut levels[writer.len..];
let levels_read = decoder.read_def_levels(slice,
range.clone())?;
nulls.reserve(levels_read);
diff --git a/parquet/src/arrow/record_reader/mod.rs
b/parquet/src/arrow/record_reader/mod.rs
index ea98234199..49c69c87e3 100644
--- a/parquet/src/arrow/record_reader/mod.rs
+++ b/parquet/src/arrow/record_reader/mod.rs
@@ -18,7 +18,7 @@
use arrow_buffer::Buffer;
use crate::arrow::record_reader::{
- buffer::{BufferQueue, ScalarBuffer, ValuesBuffer},
+ buffer::{BufferQueue, ValuesBuffer},
definition_levels::{DefinitionLevelBuffer, DefinitionLevelBufferDecoder},
};
use crate::column::reader::decoder::RepetitionLevelDecoderImpl;
@@ -37,8 +37,7 @@ pub(crate) mod buffer;
mod definition_levels;
/// A `RecordReader` is a stateful column reader that delimits semantic
records.
-pub type RecordReader<T> =
- GenericRecordReader<ScalarBuffer<<T as DataType>::T>,
ColumnValueDecoderImpl<T>>;
+pub type RecordReader<T> = GenericRecordReader<Vec<<T as DataType>::T>,
ColumnValueDecoderImpl<T>>;
pub(crate) type ColumnReader<CV> =
GenericColumnReader<RepetitionLevelDecoderImpl,
DefinitionLevelBufferDecoder, CV>;
@@ -53,7 +52,7 @@ pub struct GenericRecordReader<V, CV> {
values: V,
def_levels: Option<DefinitionLevelBuffer>,
- rep_levels: Option<ScalarBuffer<i16>>,
+ rep_levels: Option<Vec<i16>>,
column_reader: Option<ColumnReader<CV>>,
/// Number of buffered levels / null-padded values
num_values: usize,
@@ -81,7 +80,7 @@ where
let def_levels = (desc.max_def_level() > 0)
.then(|| DefinitionLevelBuffer::new(&desc,
packed_null_mask(&desc)));
- let rep_levels = (desc.max_rep_level() > 0).then(ScalarBuffer::new);
+ let rep_levels = (desc.max_rep_level() > 0).then(Vec::new);
Self {
values: records,
@@ -174,7 +173,9 @@ where
/// Return repetition level data.
/// The side effect is similar to `consume_def_levels`.
pub fn consume_rep_levels(&mut self) -> Option<Buffer> {
- self.rep_levels.as_mut().map(|x| x.consume())
+ self.rep_levels
+ .as_mut()
+ .map(|x| Buffer::from_vec(x.consume()))
}
/// Returns currently stored buffer data.
@@ -209,9 +210,9 @@ where
let rep_levels = self
.rep_levels
.as_mut()
- .map(|levels| levels.spare_capacity_mut(batch_size));
+ .map(|levels| levels.get_output_slice(batch_size));
let def_levels = self.def_levels.as_mut();
- let values = self.values.spare_capacity_mut(batch_size);
+ let values = self.values.get_output_slice(batch_size);
let (records_read, values_read, levels_read) = self
.column_reader
@@ -234,9 +235,9 @@ where
self.num_records += records_read;
self.num_values += levels_read;
- self.values.set_len(self.num_values);
+ self.values.truncate_buffer(self.num_values);
if let Some(ref mut buf) = self.rep_levels {
- buf.set_len(self.num_values)
+ buf.truncate_buffer(self.num_values)
};
if let Some(ref mut buf) = self.def_levels {
buf.set_len(self.num_values)
@@ -257,7 +258,7 @@ mod tests {
use std::sync::Arc;
use arrow::buffer::Buffer;
- use arrow_array::builder::{Int16BufferBuilder, Int32BufferBuilder};
+ use arrow_array::builder::Int16BufferBuilder;
use crate::basic::Encoding;
use crate::data_type::Int32Type;
@@ -334,10 +335,7 @@ mod tests {
assert_eq!(7, record_reader.num_values());
}
- let mut bb = Int32BufferBuilder::new(7);
- bb.append_slice(&[4, 7, 6, 3, 2, 8, 9]);
- let expected_buffer = bb.finish();
- assert_eq!(expected_buffer, record_reader.consume_record_data());
+ assert_eq!(record_reader.consume_record_data(), &[4, 7, 6, 3, 2, 8,
9]);
assert_eq!(None, record_reader.consume_def_levels());
assert_eq!(None, record_reader.consume_bitmap());
}
@@ -434,13 +432,12 @@ mod tests {
// Verify result record data
let actual = record_reader.consume_record_data();
- let actual_values = actual.typed_data::<i32>();
let expected = &[0, 7, 0, 6, 3, 0, 8];
- assert_eq!(actual_values.len(), expected.len());
+ assert_eq!(actual.len(), expected.len());
// Only validate valid values are equal
- let iter = expected_valid.iter().zip(actual_values).zip(expected);
+ let iter = expected_valid.iter().zip(&actual).zip(expected);
for ((valid, actual), expected) in iter {
if *valid {
assert_eq!(actual, expected)
@@ -544,12 +541,11 @@ mod tests {
// Verify result record data
let actual = record_reader.consume_record_data();
- let actual_values = actual.typed_data::<i32>();
let expected = &[4, 0, 0, 7, 6, 3, 2, 8, 9];
- assert_eq!(actual_values.len(), expected.len());
+ assert_eq!(actual.len(), expected.len());
// Only validate valid values are equal
- let iter = expected_valid.iter().zip(actual_values).zip(expected);
+ let iter = expected_valid.iter().zip(&actual).zip(expected);
for ((valid, actual), expected) in iter {
if *valid {
assert_eq!(actual, expected)
@@ -713,10 +709,7 @@ mod tests {
assert_eq!(0, record_reader.read_records(10).unwrap());
}
- let mut bb = Int32BufferBuilder::new(3);
- bb.append_slice(&[6, 3, 2]);
- let expected_buffer = bb.finish();
- assert_eq!(expected_buffer, record_reader.consume_record_data());
+ assert_eq!(record_reader.consume_record_data(), &[6, 3, 2]);
assert_eq!(None, record_reader.consume_def_levels());
assert_eq!(None, record_reader.consume_bitmap());
}
@@ -814,13 +807,12 @@ mod tests {
// Verify result record data
let actual = record_reader.consume_record_data();
- let actual_values = actual.typed_data::<i32>();
let expected = &[0, 6, 3];
- assert_eq!(actual_values.len(), expected.len());
+ assert_eq!(actual.len(), expected.len());
// Only validate valid values are equal
- let iter = expected_valid.iter().zip(actual_values).zip(expected);
+ let iter = expected_valid.iter().zip(&actual).zip(expected);
for ((valid, actual), expected) in iter {
if *valid {
assert_eq!(actual, expected)