alamb commented on code in PR #9831:
URL: https://github.com/apache/arrow-rs/pull/9831#discussion_r3173393237
##########
parquet/src/column/writer/mod.rs:
##########
@@ -516,7 +574,15 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E>
{
def_levels: Option<&[i16]>,
rep_levels: Option<&[i16]>,
) -> Result<usize> {
- self.write_batch_internal(values, None, def_levels, rep_levels, None,
None, None)
+ self.write_batch_internal(
+ values,
+ None,
+ def_levels.map_or(LevelDataRef::Absent,
LevelDataRef::Materialized),
Review Comment:
It might be eaiser to grok this logic and make it clearer how LevelDataRef
is be constructed if you added `From` impls rather than constructing it
directly here.
Something like this
```rust
impl From<&[i16]> for LevelDataRef {
...
}
```
Then you could write this like
```rust
self.write_batch_internal(
values,
None,
LevelDataRef::from(def_levels),
LevelDataRef::from(rep_levels),
None,
None,
None,
)
```
A similar pattern could be applied to all sites where LevelDataRef is created
##########
parquet/src/encodings/levels.rs:
##########
@@ -92,6 +92,33 @@ impl LevelEncoder {
}
}
+ /// Encode `count` repetitions of a single level value, calling
+ /// `observer(value, count)` exactly once.
+ ///
+ /// This is O(1) amortized for RLE-based encoders (after a small warmup).
Review Comment:
It took me a while to understand how / why this function is different than
`put_with_observer` -- I think it would help future readers to explain a bit
more in comments.
Something like this maybe:
```suggestion
/// This can be used for encode uniform runs without
/// allocating a level buffer. It is O(1) amortized
/// for RLE-based encoders (after a small warmup).
///
/// Like [`Self::put_with_observer`], but specialized for a
/// single repeated level value.
```
##########
parquet/src/arrow/arrow_writer/levels.rs:
##########
@@ -754,17 +734,129 @@ impl LevelInfoBuilder {
/// The data necessary to write a primitive Arrow array to parquet, taking
into account
/// any non-primitive parents it may have in the arrow representation
+#[derive(Debug, Clone)]
+pub(crate) enum LevelData {
+ Absent,
+ Materialized(Vec<i16>),
+ Uniform { value: i16, count: usize },
+}
+
+impl PartialEq for LevelData {
+ fn eq(&self, other: &Self) -> bool {
+ match (self, other) {
+ (Self::Absent, Self::Absent) => true,
+ (Self::Materialized(a), Self::Materialized(b)) => a == b,
+ (Self::Uniform { value: v, count: n }, Self::Materialized(b))
Review Comment:
It was not clear to me without reading this more carefully why a manual
`PartialEq impl` was needed -- maybe some comments would help at the top
```rust
/// Manual impl checks logical contents of level data
/// rather than physical representation
```
##########
parquet/src/arrow/arrow_writer/levels.rs:
##########
@@ -754,17 +734,129 @@ impl LevelInfoBuilder {
/// The data necessary to write a primitive Arrow array to parquet, taking
into account
/// any non-primitive parents it may have in the arrow representation
+#[derive(Debug, Clone)]
+pub(crate) enum LevelData {
+ Absent,
+ Materialized(Vec<i16>),
+ Uniform { value: i16, count: usize },
+}
+
+impl PartialEq for LevelData {
+ fn eq(&self, other: &Self) -> bool {
+ match (self, other) {
+ (Self::Absent, Self::Absent) => true,
+ (Self::Materialized(a), Self::Materialized(b)) => a == b,
+ (Self::Uniform { value: v, count: n }, Self::Materialized(b))
+ | (Self::Materialized(b), Self::Uniform { value: v, count: n }) =>
{
+ b.len() == *n && b.iter().all(|x| x == v)
+ }
+ (
+ Self::Uniform {
+ value: v1,
+ count: n1,
+ },
+ Self::Uniform {
+ value: v2,
+ count: n2,
+ },
+ ) => v1 == v2 && n1 == n2,
+ _ => false,
+ }
+ }
+}
+
+impl Eq for LevelData {}
+
+impl LevelData {
+ fn new(present: bool) -> Self {
+ match present {
+ true => Self::Materialized(Vec::new()),
+ false => Self::Absent,
+ }
+ }
+
+ pub(crate) fn as_ref(&self) -> LevelDataRef<'_> {
+ match self {
+ Self::Absent => LevelDataRef::Absent,
+ Self::Materialized(values) => LevelDataRef::Materialized(values),
+ Self::Uniform { value, count } => LevelDataRef::Uniform {
+ value: *value,
+ count: *count,
+ },
+ }
+ }
+
+ pub(crate) fn slice(&self, offset: usize, len: usize) -> Self {
+ match self {
+ Self::Absent => Self::Absent,
+ Self::Materialized(values) =>
Self::Materialized(values[offset..offset + len].to_vec()),
Review Comment:
I was thinking that it might be possible to avoid this `to_vec()` allocation
if `slice` took an owned self
```rust
pub(crate) fn slice(self, offset: usize, len: usize) -> Self {
```
However, I couldn't find any existing callsites that immediately discard an
owned `LevelData`
##########
parquet/src/column/writer/mod.rs:
##########
@@ -650,64 +716,100 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a,
E> {
values_offset: usize,
value_indices: Option<&[usize]>,
num_levels: usize,
- def_levels: Option<&[i16]>,
- rep_levels: Option<&[i16]>,
+ def_levels: LevelDataRef<'_>,
+ rep_levels: LevelDataRef<'_>,
) -> Result<usize> {
// Process definition levels and determine how many values to write.
let values_to_write = if self.descr.max_def_level() > 0 {
- let levels = def_levels.ok_or_else(|| {
- general_err!(
- "Definition levels are required, because max definition
level = {}",
- self.descr.max_def_level()
- )
- })?;
-
- let mut values_to_write = 0usize;
let max_def = self.descr.max_def_level();
- let encoder = &mut self.def_levels_encoder;
- match self.page_metrics.definition_level_histogram.as_mut() {
- Some(histogram) => encoder.put_with_observer(levels, |level,
count| {
- values_to_write += count * (level == max_def) as usize;
- histogram.increment_by(level, count as i64);
- }),
- None => encoder.put_with_observer(levels, |level, count| {
- values_to_write += count * (level == max_def) as usize;
- }),
- };
- self.page_metrics.num_page_nulls += (levels.len() -
values_to_write) as u64;
- values_to_write
+ match def_levels {
+ LevelDataRef::Absent => {
+ return Err(general_err!(
+ "Definition levels are required, because max
definition level = {}",
+ self.descr.max_def_level()
+ ));
+ }
+ LevelDataRef::Materialized(levels) => {
+ let mut values_to_write = 0usize;
+ let encoder = &mut self.def_levels_encoder;
+ match
self.page_metrics.definition_level_histogram.as_mut() {
+ Some(histogram) => encoder.put_with_observer(levels,
|level, count| {
+ values_to_write += count * (level == max_def) as
usize;
+ histogram.increment_by(level, count as i64);
+ }),
+ None => encoder.put_with_observer(levels, |level,
count| {
+ values_to_write += count * (level == max_def) as
usize;
+ }),
+ };
+ self.page_metrics.num_page_nulls += (levels.len() -
values_to_write) as u64;
+ values_to_write
+ }
+ LevelDataRef::Uniform { value, count } => {
Review Comment:
I think this is the key part of the whole PR, right? A special case when the
levels information is the same (all null, all non null)
##########
parquet/src/column/writer/mod.rs:
##########
@@ -314,6 +314,54 @@ impl<T: Default> ColumnMetrics<T> {
}
}
+/// Borrowed view of level data, analogous to `&str` for `LevelData`'s
`String`.
+///
+/// This type exists so that [`GenericColumnWriter::write_batch_internal`] can
accept
+/// level data from two callers without allocating: the public
[`GenericColumnWriter::write_batch`]
+/// API wraps caller-provided `&[i16]` slices directly as `Materialized`,
while the Arrow
+/// writer path converts owned `LevelData` via `.as_ref()` (which may also
produce `Uniform`).
Review Comment:
I found this a little hard to grok -- in particular the key thing it took me
a while to understand is that this structure is needed to create a `LevelData`
enum from a pre-exisiting `&[i16]`.
Maybe something like this would be easier to understand
```suggestion
/// Borrowed view of level data, analogous to `&str` for `LevelData`'s
`String`.
///
/// `LevelDataRef` can be constructed from `LevelData` AND directly from an
existing
/// `&[i16]` without allocating.
```
Bonus points for adding some rustdoc example code for constructing.
##########
parquet/src/arrow/arrow_writer/levels.rs:
##########
@@ -754,17 +734,129 @@ impl LevelInfoBuilder {
/// The data necessary to write a primitive Arrow array to parquet, taking
into account
/// any non-primitive parents it may have in the arrow representation
+#[derive(Debug, Clone)]
+pub(crate) enum LevelData {
+ Absent,
+ Materialized(Vec<i16>),
+ Uniform { value: i16, count: usize },
+}
+
+impl PartialEq for LevelData {
+ fn eq(&self, other: &Self) -> bool {
+ match (self, other) {
+ (Self::Absent, Self::Absent) => true,
+ (Self::Materialized(a), Self::Materialized(b)) => a == b,
+ (Self::Uniform { value: v, count: n }, Self::Materialized(b))
+ | (Self::Materialized(b), Self::Uniform { value: v, count: n }) =>
{
+ b.len() == *n && b.iter().all(|x| x == v)
+ }
+ (
+ Self::Uniform {
+ value: v1,
+ count: n1,
+ },
+ Self::Uniform {
+ value: v2,
+ count: n2,
+ },
+ ) => v1 == v2 && n1 == n2,
+ _ => false,
+ }
+ }
+}
+
+impl Eq for LevelData {}
+
+impl LevelData {
+ fn new(present: bool) -> Self {
+ match present {
+ true => Self::Materialized(Vec::new()),
+ false => Self::Absent,
+ }
+ }
+
+ pub(crate) fn as_ref(&self) -> LevelDataRef<'_> {
+ match self {
+ Self::Absent => LevelDataRef::Absent,
+ Self::Materialized(values) => LevelDataRef::Materialized(values),
+ Self::Uniform { value, count } => LevelDataRef::Uniform {
+ value: *value,
+ count: *count,
+ },
+ }
+ }
+
+ pub(crate) fn slice(&self, offset: usize, len: usize) -> Self {
+ match self {
+ Self::Absent => Self::Absent,
+ Self::Materialized(values) =>
Self::Materialized(values[offset..offset + len].to_vec()),
+ Self::Uniform { value, .. } => Self::Uniform {
+ value: *value,
+ count: len,
+ },
+ }
+ }
+
+ fn append_run(&mut self, value: i16, count: usize) {
+ if count == 0 {
+ return;
+ }
+
+ match self {
+ Self::Absent => {}
+ Self::Materialized(values) if values.is_empty() => {
+ *self = Self::Uniform { value, count };
+ }
+ Self::Materialized(values) =>
values.extend(std::iter::repeat_n(value, count)),
+ Self::Uniform {
+ value: uniform_value,
+ count: uniform_count,
+ } if *uniform_value == value => {
+ *uniform_count += count;
+ }
+ Self::Uniform { .. } => {
+ let values = self.materialize_mut().unwrap();
+ values.extend(std::iter::repeat_n(value, count));
+ }
+ }
+ }
+
+ fn extend_from_iter<I>(&mut self, iter: I)
+ where
+ I: IntoIterator<Item = i16>,
+ {
+ if let Some(values) = self.materialize_mut() {
+ values.extend(iter);
+ }
+ }
+
+ fn materialize_mut(&mut self) -> Option<&mut Vec<i16>> {
Review Comment:
I think some comments here would also help -- namely that this forces Self
--> Self::Materialized and returns the inner Vec<i16>
##########
parquet/src/encodings/levels.rs:
##########
@@ -92,6 +92,33 @@ impl LevelEncoder {
}
}
+ /// Encode `count` repetitions of a single level value, calling
+ /// `observer(value, count)` exactly once.
+ ///
+ /// This is O(1) amortized for RLE-based encoders (after a small warmup).
+ #[inline]
+ pub fn put_n_with_observer<F>(&mut self, value: i16, count: usize, mut
observer: F)
+ where
+ F: FnMut(i16, usize),
+ {
+ match *self {
Review Comment:
Maybe as a cleanup in a following PR, we could reduce the indent level with
something like
```rust
let encoder = match *self {
LevelEncoder::Rle(ref mut encoder) | LevelEncoder::RleV2(ref mut
encoder) => encoder
};
...
```
I see this mirrors the structure above in `pub_with_observer` and it makes
sense to keep them consistent
##########
parquet/src/column/writer/mod.rs:
##########
@@ -4397,4 +4499,152 @@ mod tests {
result.metadata.compressed_size()
);
}
+
+ /// Write-then-read roundtrip using `write_batch_internal` with the given
+ /// [`LevelDataRef`] variants, and assert the read-back matches
`expected_*`.
+ #[allow(clippy::too_many_arguments)]
+ fn column_roundtrip_uniform<T: DataType>(
+ props: WriterProperties,
+ values: &[T::T],
+ def_levels: LevelDataRef<'_>,
+ rep_levels: LevelDataRef<'_>,
+ max_def_level: i16,
+ max_rep_level: i16,
+ expected_values: &[T::T],
+ expected_def_levels: Option<&[i16]>,
+ expected_rep_levels: Option<&[i16]>,
+ ) where
+ T::T: PartialEq + std::fmt::Debug,
+ {
+ let mut file = tempfile::tempfile().unwrap();
+ let mut write = TrackedWrite::new(&mut file);
+ let page_writer = Box::new(SerializedPageWriter::new(&mut write));
+ let mut writer =
+ get_test_column_writer::<T>(page_writer, max_def_level,
max_rep_level, Arc::new(props));
+
+ writer
+ .write_batch_internal(values, None, def_levels, rep_levels, None,
None, None)
+ .unwrap();
+ let result = writer.close().unwrap();
+ drop(write);
+
+ let props = ReaderProperties::builder()
+ .set_backward_compatible_lz4(false)
+ .build();
+ let page_reader = Box::new(
+ SerializedPageReader::new_with_properties(
+ Arc::new(file),
+ &result.metadata,
+ result.rows_written as usize,
+ None,
+ Arc::new(props),
+ )
+ .unwrap(),
+ );
+ let mut reader = get_test_column_reader::<T>(page_reader,
max_def_level, max_rep_level);
+
+ let batch_size = expected_def_levels.map_or(expected_values.len(), |l|
l.len());
+ let mut actual_values = Vec::with_capacity(batch_size);
+ let mut actual_def = expected_def_levels.map(|_|
Vec::with_capacity(batch_size));
+ let mut actual_rep = expected_rep_levels.map(|_|
Vec::with_capacity(batch_size));
+
+ let (_, values_read, levels_read) = reader
+ .read_records(
+ batch_size,
+ actual_def.as_mut(),
+ actual_rep.as_mut(),
+ &mut actual_values,
+ )
+ .unwrap();
+
+ assert_eq!(&actual_values[..values_read], expected_values);
+ if let Some(ref v) = actual_def {
+ assert_eq!(&v[..levels_read], expected_def_levels.unwrap());
+ }
+ if let Some(ref v) = actual_rep {
+ assert_eq!(&v[..levels_read], expected_rep_levels.unwrap());
+ }
+ }
+
+ #[test]
+ fn test_uniform_def_levels_all_null() {
+ // All-null column: def_level=0 (null) for every slot, no values
written.
+ let max_def_level = 1;
+ let count = 100;
+ column_roundtrip_uniform::<Int32Type>(
+ Default::default(),
+ &[],
+ LevelDataRef::Uniform { value: 0, count },
+ LevelDataRef::Absent,
+ max_def_level,
+ 0,
+ &[],
+ Some(&vec![0i16; count]),
+ None,
+ );
Review Comment:
Similarly here I would find a fixture with named parts easier to follow so I
didn't have to manually match up argument locations. I twould also avoid having
to specify optional arguments like `Default::default`
Instead of
```rust
column_roundtrip_uniform::<Int32Type>(
Default::default(),
&[],
LevelDataRef::Uniform { value: 0, count },
LevelDataRef::Absent,
max_def_level,
0,
&[],
Some(&vec![0i16; count]),
None,
);
```
Something like
```rust
ColumnRoundTripUniform::<Int32Type>::new()
.with_values(&[])
.with_def_levels(LevelDataRef::Uniform { value: 0, count })
.with_rep_levels(LevelDataRef::Absent)
.with_max_def_level(max_def_level)
.with_max_replevel(0)
.with_expected_values(&[])
.with_expected_rep_levels(Some(&vec![0i16; count]))
.with_expecte_def_levels(None)
.run()
```
##########
parquet/src/encodings/levels.rs:
##########
@@ -139,3 +166,116 @@ impl LevelEncoder {
result
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ /// Encode `count` repetitions of `value` using `put_with_observer` (the
+ /// already-tested slice-based path) and return the raw encoded bytes.
Review Comment:
"(the already tested slice based path) " I don't think adds a lot of value
after this PR (it is good in the context of this PR, but once it is merged this
will be confusing I think
##########
parquet/src/arrow/arrow_writer/levels.rs:
##########
@@ -754,17 +734,129 @@ impl LevelInfoBuilder {
/// The data necessary to write a primitive Arrow array to parquet, taking
into account
/// any non-primitive parents it may have in the arrow representation
+#[derive(Debug, Clone)]
+pub(crate) enum LevelData {
+ Absent,
+ Materialized(Vec<i16>),
+ Uniform { value: i16, count: usize },
+}
+
+impl PartialEq for LevelData {
+ fn eq(&self, other: &Self) -> bool {
+ match (self, other) {
+ (Self::Absent, Self::Absent) => true,
+ (Self::Materialized(a), Self::Materialized(b)) => a == b,
+ (Self::Uniform { value: v, count: n }, Self::Materialized(b))
+ | (Self::Materialized(b), Self::Uniform { value: v, count: n }) =>
{
+ b.len() == *n && b.iter().all(|x| x == v)
+ }
+ (
+ Self::Uniform {
+ value: v1,
+ count: n1,
+ },
+ Self::Uniform {
+ value: v2,
+ count: n2,
+ },
+ ) => v1 == v2 && n1 == n2,
+ _ => false,
+ }
+ }
+}
+
+impl Eq for LevelData {}
+
+impl LevelData {
+ fn new(present: bool) -> Self {
+ match present {
+ true => Self::Materialized(Vec::new()),
+ false => Self::Absent,
+ }
+ }
+
+ pub(crate) fn as_ref(&self) -> LevelDataRef<'_> {
+ match self {
+ Self::Absent => LevelDataRef::Absent,
+ Self::Materialized(values) => LevelDataRef::Materialized(values),
+ Self::Uniform { value, count } => LevelDataRef::Uniform {
+ value: *value,
+ count: *count,
+ },
+ }
+ }
+
+ pub(crate) fn slice(&self, offset: usize, len: usize) -> Self {
+ match self {
+ Self::Absent => Self::Absent,
+ Self::Materialized(values) =>
Self::Materialized(values[offset..offset + len].to_vec()),
+ Self::Uniform { value, .. } => Self::Uniform {
+ value: *value,
+ count: len,
+ },
+ }
+ }
+
+ fn append_run(&mut self, value: i16, count: usize) {
Review Comment:
I think some comments here would help future readers
##########
parquet/src/arrow/arrow_writer/levels.rs:
##########
@@ -839,12 +931,8 @@ impl ArrayLevels {
/// `non_null_indices`. The array is sliced to the range covered by
/// those indices, and they are shifted to be relative to the slice.
pub(crate) fn slice_for_chunk(&self, chunk: &CdcChunk) -> Self {
- let def_levels = self.def_levels.as_ref().map(|levels| {
- levels[chunk.level_offset..chunk.level_offset +
chunk.num_levels].to_vec()
- });
- let rep_levels = self.rep_levels.as_ref().map(|levels| {
- levels[chunk.level_offset..chunk.level_offset +
chunk.num_levels].to_vec()
- });
+ let def_levels = self.def_levels.slice(chunk.level_offset,
chunk.num_levels);
Review Comment:
The new encpsulation makes this much easier to read I think -- so not only
does this PR make the code faster, I also think it makes it easier to follow ❤️
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]