tustvold commented on a change in pull request #1041:
URL: https://github.com/apache/arrow-rs/pull/1041#discussion_r767699564
##########
File path: parquet/src/arrow/record_reader.rs
##########
@@ -16,75 +16,248 @@
// under the License.
use std::cmp::{max, min};
-use std::mem::{replace, size_of};
-
-use crate::column::{page::PageReader, reader::ColumnReaderImpl};
+use std::marker::PhantomData;
+use std::mem::replace;
+use std::ops::Range;
+
+use crate::arrow::record_reader::private::{
+ DefinitionLevels, RecordBuffer, RepetitionLevels,
+};
+use crate::column::{
+ page::PageReader,
+ reader::{
+ private::{
+ ColumnLevelDecoder, ColumnLevelDecoderImpl, ColumnValueDecoder,
+ ColumnValueDecoderImpl,
+ },
+ GenericColumnReader,
+ },
+};
use crate::data_type::DataType;
-use crate::errors::{ParquetError, Result};
+use crate::errors::Result;
use crate::schema::types::ColumnDescPtr;
use arrow::array::BooleanBufferBuilder;
use arrow::bitmap::Bitmap;
use arrow::buffer::{Buffer, MutableBuffer};
+pub(crate) mod private {
+ use super::*;
+
+ pub trait RecordBuffer: Sized + Default {
+ type Output: Sized;
+
+ type Writer: ?Sized;
+
+ /// Split out `len` items
+ fn split(&mut self, len: usize) -> Self::Output;
+
+ /// Get a writer with `batch_size` capacity
+ fn writer(&mut self, batch_size: usize) -> &mut Self::Writer;
+
+ /// Record a write of `len` items
+ fn commit(&mut self, len: usize);
+ }
+
+ pub trait RepetitionLevels: RecordBuffer {
+ /// Inspects the buffered repetition levels in `range` and returns the
number of
+ /// "complete" records along with the corresponding number of values
+ ///
+ /// A "complete" record is one where the buffer contains a subsequent
repetition level of 0
+ fn count_records(
+ &self,
+ range: Range<usize>,
+ max_records: usize,
+ ) -> (usize, usize);
+ }
+
+ pub trait DefinitionLevels: RecordBuffer {
+ /// Update the provided validity mask based on contained levels
+ fn update_valid_mask(
+ &self,
+ valid: &mut BooleanBufferBuilder,
+ range: Range<usize>,
+ max_level: i16,
+ );
+ }
+
+ pub struct TypedBuffer<T> {
+ buffer: MutableBuffer,
+
+ /// Length in elements of size T
+ len: usize,
+
+ /// Placeholder to allow `T` as an invariant generic parameter
+ _phantom: PhantomData<*mut T>,
+ }
+
+ impl<T> Default for TypedBuffer<T> {
+ fn default() -> Self {
+ Self {
+ buffer: MutableBuffer::new(0),
+ len: 0,
+ _phantom: Default::default(),
+ }
+ }
+ }
+
+ impl<T> RecordBuffer for TypedBuffer<T> {
+ type Output = Buffer;
+
+ type Writer = [T];
+
+ fn split(&mut self, len: usize) -> Self::Output {
+ let num_bytes = len * std::mem::size_of::<T>();
+ let remaining_bytes = self.buffer.len() - num_bytes;
+ // TODO: Optimize to reduce the copy
+ // create an empty buffer, as it will be resized below
+ let mut remaining = MutableBuffer::new(0);
+ remaining.resize(remaining_bytes, 0);
+
+ let new_records = remaining.as_slice_mut();
+
+ new_records[0..remaining_bytes]
+ .copy_from_slice(&self.buffer.as_slice()[num_bytes..]);
+
+ self.buffer.resize(num_bytes, 0);
+
+ replace(&mut self.buffer, remaining).into()
+ }
+
+ fn writer(&mut self, batch_size: usize) -> &mut Self::Writer {
+ self.buffer
+ .resize((self.len + batch_size) * std::mem::size_of::<T>(), 0);
+
+ let (prefix, values, suffix) =
+ unsafe { self.buffer.as_slice_mut().align_to_mut::<T>() };
+ assert!(prefix.is_empty() && suffix.is_empty());
+
+ &mut values[self.len..self.len + batch_size]
+ }
+
+ fn commit(&mut self, len: usize) {
+ self.len = len;
+
+ let new_bytes = self.len * std::mem::size_of::<T>();
+ assert!(new_bytes <= self.buffer.len());
+ self.buffer.resize(new_bytes, 0);
+ }
+ }
+
+ impl RepetitionLevels for TypedBuffer<i16> {
+ fn count_records(
+ &self,
+ range: Range<usize>,
+ max_records: usize,
+ ) -> (usize, usize) {
+ let (prefix, buf, suffix) =
+ unsafe { self.buffer.as_slice().align_to::<i16>() };
+ assert!(prefix.is_empty() && suffix.is_empty());
+
+ let start = range.start;
+ let mut records_read = 0;
+ let mut end_of_last_record = start;
+
+ for current in range {
+ if buf[current] == 0 && current != end_of_last_record {
+ records_read += 1;
+ end_of_last_record = current;
+
+ if records_read == max_records {
+ break;
+ }
+ }
+ }
+
+ (records_read, end_of_last_record - start)
+ }
+ }
+
+ impl DefinitionLevels for TypedBuffer<i16> {
+ fn update_valid_mask(
+ &self,
+ null_mask: &mut BooleanBufferBuilder,
+ range: Range<usize>,
+ max_level: i16,
+ ) {
+ let (prefix, buf, suffix) =
+ unsafe { self.buffer.as_slice().align_to::<i16>() };
+ assert!(prefix.is_empty() && suffix.is_empty());
+
+ for i in &buf[range] {
+ null_mask.append(*i == max_level)
+ }
+ }
+ }
+}
+
const MIN_BATCH_SIZE: usize = 1024;
/// A `RecordReader` is a stateful column reader that delimits semantic
records.
-pub struct RecordReader<T: DataType> {
+pub type RecordReader<T> = GenericRecordReader<
+ private::TypedBuffer<i16>,
+ private::TypedBuffer<i16>,
+ private::TypedBuffer<<T as DataType>::T>,
+ ColumnLevelDecoderImpl,
+ ColumnLevelDecoderImpl,
+ ColumnValueDecoderImpl<T>,
+>;
+
+#[doc(hidden)]
Review comment:
This type is hidden from the docs, and the private module makes it
impossible for users to directly construct this. The direct implication is we
can finagle this type signature however we want and it not leak into downstreams
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]