This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 13f5f94064 feat(parquet): compact level representation with generic
writer dispatch (#9831)
13f5f94064 is described below
commit 13f5f940645fd17527a8397b296f04dc1f7da1d7
Author: Hippolyte Barraud <[email protected]>
AuthorDate: Thu May 7 13:16:51 2026 -0400
feat(parquet): compact level representation with generic writer dispatch
(#9831)
# Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax.
-->
- Spawn off from #9653
- Contributes to #9731
# Rationale for this change
<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->
See #9731
# What changes are included in this PR?
Represent definition and repetition levels as `LevelData`/`LevelDataRef`
with `Absent`, `Materialized`, and `Uniform` variants, and thread this
through Arrow level generation, CDC chunking, and the generic column
writer.
Uniform level runs, such as required fields and all-null pages, can now
be encoded without materializing dense `Vec<i16>` buffers. Add bulk run
support to `LevelEncoder`/`RleEncoder` so repeated levels are encoded in
amortized O(1) after the RLE warmup, while preserving histogram, row
count, null count, page splitting, and CDC chunk accounting.
# Are these changes tested?
<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code
If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->
All tests passing. Coverage exercises bulk RLE level encoding,
compact/uniform `LevelData` slicing and writer roundtrips across Parquet
v1/v2, and CDC/Arrow writer behavior including all-null and nested-level
cases.
# Are there any user-facing changes?
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
If there are any breaking changes to public APIs, please call them out.
-->
None.
---------
Signed-off-by: Hippolyte Barraud <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
---
parquet/src/arrow/arrow_writer/levels.rs | 417 +++++++++++++++++-----------
parquet/src/arrow/arrow_writer/mod.rs | 34 ++-
parquet/src/column/chunker/cdc.rs | 104 +++++--
parquet/src/column/writer/mod.rs | 450 +++++++++++++++++++++++++++----
parquet/src/encodings/levels.rs | 144 ++++++++++
5 files changed, 905 insertions(+), 244 deletions(-)
diff --git a/parquet/src/arrow/arrow_writer/levels.rs
b/parquet/src/arrow/arrow_writer/levels.rs
index d23873278e..1f66de349d 100644
--- a/parquet/src/arrow/arrow_writer/levels.rs
+++ b/parquet/src/arrow/arrow_writer/levels.rs
@@ -41,6 +41,7 @@
//! \[1\]
[parquet-format#nested-encoding](https://github.com/apache/parquet-format#nested-encoding)
use crate::column::chunker::CdcChunk;
+use crate::column::writer::LevelDataRef;
use crate::errors::{ParquetError, Result};
use arrow_array::cast::AsArray;
use arrow_array::{Array, ArrayRef, OffsetSizeTrait};
@@ -316,7 +317,7 @@ impl LevelInfoBuilder {
|child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
child.write(start_idx..end_idx);
child.visit_leaves(|leaf| {
- let rep_levels = leaf.rep_levels.as_mut().unwrap();
+ let rep_levels =
leaf.rep_levels.materialize_mut().unwrap();
let mut rev = rep_levels.iter_mut().rev();
let mut remaining = end_idx - start_idx;
@@ -336,17 +337,18 @@ impl LevelInfoBuilder {
})
};
+ // In a list column, each row falls into one of three categories:
+ // - "null": the list slot is absent (!is_valid), encoded at def_level
- 2
+ // - "empty": the list slot is present but has zero elements
+ // (offsets[i] == offsets[i+1]), encoded at def_level - 1
+ // - non-empty: the list slot has child values, which are recursed into
+ //
+ // Consecutive runs of null or empty rows are batched and written
together.
let write_null_run = |child: &mut LevelInfoBuilder, count: usize| {
if count > 0 {
child.visit_leaves(|leaf| {
- leaf.rep_levels
- .as_mut()
- .unwrap()
- .extend(std::iter::repeat_n(ctx.rep_level - 1, count));
- leaf.def_levels
- .as_mut()
- .unwrap()
- .extend(std::iter::repeat_n(ctx.def_level - 2, count));
+ leaf.append_rep_level_run(ctx.rep_level - 1, count);
+ leaf.append_def_level_run(ctx.def_level - 2, count);
});
}
};
@@ -354,14 +356,8 @@ impl LevelInfoBuilder {
let write_empty_run = |child: &mut LevelInfoBuilder, count: usize| {
if count > 0 {
child.visit_leaves(|leaf| {
- leaf.rep_levels
- .as_mut()
- .unwrap()
- .extend(std::iter::repeat_n(ctx.rep_level - 1, count));
- leaf.def_levels
- .as_mut()
- .unwrap()
- .extend(std::iter::repeat_n(ctx.def_level - 1, count));
+ leaf.append_rep_level_run(ctx.rep_level - 1, count);
+ leaf.append_def_level_run(ctx.def_level - 1, count);
});
}
};
@@ -431,7 +427,7 @@ impl LevelInfoBuilder {
|child: &mut LevelInfoBuilder, start_idx: usize, end_idx: usize| {
child.write(start_idx..end_idx);
child.visit_leaves(|leaf| {
- let rep_levels = leaf.rep_levels.as_mut().unwrap();
+ let rep_levels =
leaf.rep_levels.materialize_mut().unwrap();
let mut rev = rep_levels.iter_mut().rev();
let mut remaining = end_idx - start_idx;
@@ -453,19 +449,15 @@ impl LevelInfoBuilder {
let write_empty_slice = |child: &mut LevelInfoBuilder| {
child.visit_leaves(|leaf| {
- let rep_levels = leaf.rep_levels.as_mut().unwrap();
- rep_levels.push(ctx.rep_level - 1);
- let def_levels = leaf.def_levels.as_mut().unwrap();
- def_levels.push(ctx.def_level - 1);
+ leaf.append_rep_level_run(ctx.rep_level - 1, 1);
+ leaf.append_def_level_run(ctx.def_level - 1, 1);
})
};
let write_null_slice = |child: &mut LevelInfoBuilder| {
child.visit_leaves(|leaf| {
- let rep_levels = leaf.rep_levels.as_mut().unwrap();
- rep_levels.push(ctx.rep_level - 1);
- let def_levels = leaf.def_levels.as_mut().unwrap();
- def_levels.push(ctx.def_level - 2);
+ leaf.append_rep_level_run(ctx.rep_level - 1, 1);
+ leaf.append_def_level_run(ctx.def_level - 2, 1);
})
};
@@ -513,13 +505,8 @@ impl LevelInfoBuilder {
for child in children {
child.visit_leaves(|info| {
let len = range.end - range.start;
-
- let def_levels = info.def_levels.as_mut().unwrap();
- def_levels.extend(std::iter::repeat_n(ctx.def_level - 1,
len));
-
- if let Some(rep_levels) = info.rep_levels.as_mut() {
- rep_levels.extend(std::iter::repeat_n(ctx.rep_level,
len));
- }
+ info.append_def_level_run(ctx.def_level - 1, len);
+ info.append_rep_level_run(ctx.rep_level, len);
})
}
};
@@ -579,7 +566,7 @@ impl LevelInfoBuilder {
child.write(values_start..values_end);
child.visit_leaves(|leaf| {
- let rep_levels = leaf.rep_levels.as_mut().unwrap();
+ let rep_levels = leaf.rep_levels.materialize_mut().unwrap();
let row_indices = (0..fixed_size)
.rev()
@@ -605,10 +592,8 @@ impl LevelInfoBuilder {
let write_empty = |child: &mut LevelInfoBuilder, start_idx: usize,
end_idx: usize| {
let len = end_idx - start_idx;
child.visit_leaves(|leaf| {
- let rep_levels = leaf.rep_levels.as_mut().unwrap();
- rep_levels.extend(std::iter::repeat_n(ctx.rep_level - 1, len));
- let def_levels = leaf.def_levels.as_mut().unwrap();
- def_levels.extend(std::iter::repeat_n(ctx.def_level - 1, len));
+ leaf.append_rep_level_run(ctx.rep_level - 1, len);
+ leaf.append_def_level_run(ctx.def_level - 1, len);
})
};
@@ -634,10 +619,8 @@ impl LevelInfoBuilder {
}
// Add null row
child.visit_leaves(|leaf| {
- let rep_levels = leaf.rep_levels.as_mut().unwrap();
- rep_levels.push(ctx.rep_level - 1);
- let def_levels = leaf.def_levels.as_mut().unwrap();
- def_levels.push(ctx.def_level - 2);
+ leaf.append_rep_level_run(ctx.rep_level - 1, 1);
+ leaf.append_def_level_run(ctx.def_level - 2, 1);
})
}
}
@@ -655,37 +638,34 @@ impl LevelInfoBuilder {
fn write_leaf(info: &mut ArrayLevels, range: Range<usize>) {
let len = range.end - range.start;
- match &mut info.def_levels {
- Some(def_levels) => {
- def_levels.reserve(len);
- info.non_null_indices.reserve(len);
-
- match &info.logical_nulls {
- Some(nulls) => {
- assert!(range.end <= nulls.len());
- let nulls = nulls.inner();
- def_levels.extend(range.clone().map(|i| {
- // Safety: range.end was asserted to be in bounds
earlier
- let valid = unsafe { nulls.value_unchecked(i) };
- info.max_def_level - (!valid as i16)
- }));
- info.non_null_indices.extend(
- BitIndexIterator::new(nulls.inner(),
nulls.offset() + range.start, len)
- .map(|i| i + range.start),
- );
- }
- None => {
- let iter = std::iter::repeat_n(info.max_def_level,
len);
- def_levels.extend(iter);
- info.non_null_indices.extend(range);
- }
+ if matches!(info.def_levels, LevelData::Absent) {
+ info.non_null_indices.extend(range.clone());
+ } else {
+ let max_def_level = info.max_def_level;
+ match &info.logical_nulls {
+ Some(nulls) => {
+ assert!(range.end <= nulls.len());
+ let nulls = nulls.inner();
+ info.def_levels.extend_from_iter(range.clone().map(|i| {
+ // Safety: range.end was asserted to be in bounds
earlier
+ let valid = unsafe { nulls.value_unchecked(i) };
+ max_def_level - (!valid as i16)
+ }));
+ info.non_null_indices.reserve(len);
+ info.non_null_indices.extend(
+ BitIndexIterator::new(nulls.inner(), nulls.offset() +
range.start, len)
+ .map(|i| i + range.start),
+ );
+ }
+ None => {
+ info.append_def_level_run(max_def_level, len);
+ info.non_null_indices.extend(range.clone());
}
}
- None => info.non_null_indices.extend(range),
}
- if let Some(rep_levels) = &mut info.rep_levels {
- rep_levels.extend(std::iter::repeat_n(info.max_rep_level, len))
+ if !matches!(info.rep_levels, LevelData::Absent) {
+ info.append_rep_level_run(info.max_rep_level, len);
}
}
@@ -754,17 +734,142 @@ impl LevelInfoBuilder {
/// The data necessary to write a primitive Arrow array to parquet, taking
into account
/// any non-primitive parents it may have in the arrow representation
+#[derive(Debug, Clone)]
+pub(crate) enum LevelData {
+ Absent,
+ Materialized(Vec<i16>),
+ Uniform { value: i16, count: usize },
+}
+
+// Compare logical level contents rather than physical representation, so a
+// uniform run compares equal to the equivalent materialized buffer.
+impl PartialEq for LevelData {
+ fn eq(&self, other: &Self) -> bool {
+ match (self, other) {
+ (Self::Absent, Self::Absent) => true,
+ (Self::Materialized(a), Self::Materialized(b)) => a == b,
+ (Self::Uniform { value: v, count: n }, Self::Materialized(b))
+ | (Self::Materialized(b), Self::Uniform { value: v, count: n }) =>
{
+ b.len() == *n && b.iter().all(|x| x == v)
+ }
+ (
+ Self::Uniform {
+ value: v1,
+ count: n1,
+ },
+ Self::Uniform {
+ value: v2,
+ count: n2,
+ },
+ ) => v1 == v2 && n1 == n2,
+ _ => false,
+ }
+ }
+}
+
+impl Eq for LevelData {}
+
+impl LevelData {
+ fn new(present: bool) -> Self {
+ match present {
+ true => Self::Materialized(Vec::new()),
+ false => Self::Absent,
+ }
+ }
+
+ pub(crate) fn as_ref(&self) -> LevelDataRef<'_> {
+ match self {
+ Self::Absent => LevelDataRef::Absent,
+ Self::Materialized(values) => LevelDataRef::Materialized(values),
+ Self::Uniform { value, count } => LevelDataRef::Uniform {
+ value: *value,
+ count: *count,
+ },
+ }
+ }
+
+ pub(crate) fn slice(&self, offset: usize, len: usize) -> Self {
+ match self {
+ Self::Absent => Self::Absent,
+ Self::Materialized(values) =>
Self::Materialized(values[offset..offset + len].to_vec()),
+ Self::Uniform { value, .. } => Self::Uniform {
+ value: *value,
+ count: len,
+ },
+ }
+ }
+
+ fn append_run(&mut self, value: i16, count: usize) {
+ if count == 0 {
+ return;
+ }
+
+ match self {
+ // No physical level stream exists for this schema. Higher-level
+ // traversal may still append implicit levels, so this remains a
no-op.
+ Self::Absent => {}
+ // Start compact: the first appended run can be represented without
+ // allocating a level buffer.
+ Self::Materialized(values) if values.is_empty() => {
+ *self = Self::Uniform { value, count };
+ }
+ // Already materialized, so preserve the buffer representation and
append.
+ Self::Materialized(values) =>
values.extend(std::iter::repeat_n(value, count)),
+ // Preserve the compact representation while the appended run has
+ // the same value.
+ Self::Uniform {
+ value: uniform_value,
+ count: uniform_count,
+ } if *uniform_value == value => {
+ *uniform_count += count;
+ }
+ // A different value breaks the uniform representation. Materialize
+ // the existing run, then append the new run to the buffer.
+ Self::Uniform { .. } => {
+ let values = self.materialize_mut().unwrap();
+ values.extend(std::iter::repeat_n(value, count));
+ }
+ }
+ }
+
+ fn extend_from_iter<I>(&mut self, iter: I)
+ where
+ I: IntoIterator<Item = i16>,
+ {
+ if let Some(values) = self.materialize_mut() {
+ values.extend(iter);
+ }
+ }
+
+ /// Convert a uniform run into a materialized buffer if needed, then return
+ /// the mutable level buffer. Returns `None` when no physical level stream
exists.
+ fn materialize_mut(&mut self) -> Option<&mut Vec<i16>> {
+ match self {
+ Self::Absent => None,
+ Self::Materialized(values) => Some(values),
+ Self::Uniform { value, count } => {
+ let values = vec![*value; *count];
+ *self = Self::Materialized(values);
+ match self {
+ Self::Materialized(values) => Some(values),
+ _ => unreachable!(),
+ }
+ }
+ }
+ }
+}
+
#[derive(Debug, Clone)]
pub(crate) struct ArrayLevels {
/// Array's definition levels
///
/// Present if `max_def_level != 0`
- def_levels: Option<Vec<i16>>,
+ def_levels: LevelData,
/// Array's optional repetition levels
///
/// Present if `max_rep_level != 0`
- rep_levels: Option<Vec<i16>>,
+ rep_levels: LevelData,
/// The corresponding array identifying non-null slices of data
/// from the primitive array
@@ -807,8 +912,8 @@ impl ArrayLevels {
let logical_nulls = array.logical_nulls();
Self {
- def_levels: (max_def_level != 0).then(Vec::new),
- rep_levels: (max_rep_level != 0).then(Vec::new),
+ def_levels: LevelData::new(max_def_level != 0),
+ rep_levels: LevelData::new(max_rep_level != 0),
non_null_indices: vec![],
max_def_level,
max_rep_level,
@@ -821,12 +926,12 @@ impl ArrayLevels {
&self.array
}
- pub fn def_levels(&self) -> Option<&[i16]> {
- self.def_levels.as_deref()
+ pub(crate) fn def_level_data(&self) -> &LevelData {
+ &self.def_levels
}
- pub fn rep_levels(&self) -> Option<&[i16]> {
- self.rep_levels.as_deref()
+ pub(crate) fn rep_level_data(&self) -> &LevelData {
+ &self.rep_levels
}
pub fn non_null_indices(&self) -> &[usize] {
@@ -839,12 +944,8 @@ impl ArrayLevels {
/// `non_null_indices`. The array is sliced to the range covered by
/// those indices, and they are shifted to be relative to the slice.
pub(crate) fn slice_for_chunk(&self, chunk: &CdcChunk) -> Self {
- let def_levels = self.def_levels.as_ref().map(|levels| {
- levels[chunk.level_offset..chunk.level_offset +
chunk.num_levels].to_vec()
- });
- let rep_levels = self.rep_levels.as_ref().map(|levels| {
- levels[chunk.level_offset..chunk.level_offset +
chunk.num_levels].to_vec()
- });
+ let def_levels = self.def_levels.slice(chunk.level_offset,
chunk.num_levels);
+ let rep_levels = self.rep_levels.slice(chunk.level_offset,
chunk.num_levels);
// Select the non-null indices for this chunk.
let nni =
&self.non_null_indices[chunk.value_offset..chunk.value_offset +
chunk.num_values];
@@ -870,6 +971,14 @@ impl ArrayLevels {
logical_nulls,
}
}
+
+ fn append_def_level_run(&mut self, value: i16, count: usize) {
+ self.def_levels.append_run(value, count);
+ }
+
+ fn append_rep_level_run(&mut self, value: i16, count: usize) {
+ self.rep_levels.append_run(value, count);
+ }
}
#[cfg(test)]
@@ -920,8 +1029,8 @@ mod tests {
assert_eq!(levels.len(), 1);
let expected = ArrayLevels {
- def_levels: Some(vec![2; 10]),
- rep_levels: Some(vec![0, 2, 2, 1, 2, 2, 2, 0, 1, 2]),
+ def_levels: LevelData::Materialized(vec![2; 10]),
+ rep_levels: LevelData::Materialized(vec![0, 2, 2, 1, 2, 2, 2, 0,
1, 2]),
non_null_indices: vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
max_def_level: 2,
max_rep_level: 2,
@@ -941,8 +1050,8 @@ mod tests {
assert_eq!(levels.len(), 1);
let expected_levels = ArrayLevels {
- def_levels: None,
- rep_levels: None,
+ def_levels: LevelData::Absent,
+ rep_levels: LevelData::Absent,
non_null_indices: (0..10).collect(),
max_def_level: 0,
max_rep_level: 0,
@@ -969,8 +1078,8 @@ mod tests {
let logical_nulls = array.logical_nulls();
let expected_levels = ArrayLevels {
- def_levels: Some(vec![1, 0, 1, 1, 0]),
- rep_levels: None,
+ def_levels: LevelData::Materialized(vec![1, 0, 1, 1, 0]),
+ rep_levels: LevelData::Absent,
non_null_indices: vec![0, 2, 3],
max_def_level: 1,
max_rep_level: 0,
@@ -1004,8 +1113,8 @@ mod tests {
assert_eq!(levels.len(), 1);
let expected_levels = ArrayLevels {
- def_levels: Some(vec![1; 5]),
- rep_levels: Some(vec![0; 5]),
+ def_levels: LevelData::Materialized(vec![1; 5]),
+ rep_levels: LevelData::Materialized(vec![0; 5]),
non_null_indices: (0..5).collect(),
max_def_level: 1,
max_rep_level: 1,
@@ -1038,8 +1147,8 @@ mod tests {
assert_eq!(levels.len(), 1);
let expected_levels = ArrayLevels {
- def_levels: Some(vec![2, 2, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2]),
- rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1]),
+ def_levels: LevelData::Materialized(vec![2, 2, 0, 2, 2, 2, 2, 2,
2, 2, 2, 2]),
+ rep_levels: LevelData::Materialized(vec![0, 1, 0, 0, 1, 0, 1, 1,
1, 0, 1, 1]),
non_null_indices: (0..11).collect(),
max_def_level: 2,
max_rep_level: 1,
@@ -1088,8 +1197,8 @@ mod tests {
assert_eq!(levels.len(), 1);
let expected_levels = ArrayLevels {
- def_levels: Some(vec![0, 2, 0, 3, 3, 3, 3, 3, 3, 3]),
- rep_levels: Some(vec![0, 0, 0, 0, 1, 1, 1, 0, 1, 1]),
+ def_levels: LevelData::Materialized(vec![0, 2, 0, 3, 3, 3, 3, 3,
3, 3]),
+ rep_levels: LevelData::Materialized(vec![0, 0, 0, 0, 1, 1, 1, 0,
1, 1]),
non_null_indices: (4..11).collect(),
max_def_level: 3,
max_rep_level: 1,
@@ -1135,10 +1244,10 @@ mod tests {
assert_eq!(levels.len(), 1);
let expected_levels = ArrayLevels {
- def_levels: Some(vec![
+ def_levels: LevelData::Materialized(vec![
5, 5, 5, 5, 1, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,
5, 5,
]),
- rep_levels: Some(vec![
+ rep_levels: LevelData::Materialized(vec![
0, 2, 1, 2, 0, 0, 2, 1, 2, 0, 2, 1, 2, 1, 2, 1, 2, 0, 2, 1, 2,
1, 2,
]),
non_null_indices: (0..22).collect(),
@@ -1177,8 +1286,8 @@ mod tests {
assert_eq!(levels.len(), 1);
let expected_levels = ArrayLevels {
- def_levels: Some(vec![1; 4]),
- rep_levels: Some(vec![0; 4]),
+ def_levels: LevelData::Materialized(vec![1; 4]),
+ rep_levels: LevelData::Materialized(vec![0; 4]),
non_null_indices: (0..4).collect(),
max_def_level: 1,
max_rep_level: 1,
@@ -1210,8 +1319,8 @@ mod tests {
assert_eq!(levels.len(), 1);
let expected_levels = ArrayLevels {
- def_levels: Some(vec![1, 3, 3, 3, 3, 3, 3, 3]),
- rep_levels: Some(vec![0, 0, 1, 1, 0, 1, 0, 1]),
+ def_levels: LevelData::Materialized(vec![1, 3, 3, 3, 3, 3, 3, 3]),
+ rep_levels: LevelData::Materialized(vec![0, 0, 1, 1, 0, 1, 0, 1]),
non_null_indices: (0..7).collect(),
max_def_level: 3,
max_rep_level: 1,
@@ -1259,8 +1368,12 @@ mod tests {
assert_eq!(levels.len(), 1);
let expected_levels = ArrayLevels {
- def_levels: Some(vec![1, 5, 5, 5, 4, 5, 5, 5, 5, 5, 5, 5, 4, 5, 5,
5, 5, 5]),
- rep_levels: Some(vec![0, 0, 1, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 1, 2,
2, 2, 2]),
+ def_levels: LevelData::Materialized(vec![
+ 1, 5, 5, 5, 4, 5, 5, 5, 5, 5, 5, 5, 4, 5, 5, 5, 5, 5,
+ ]),
+ rep_levels: LevelData::Materialized(vec![
+ 0, 0, 1, 2, 1, 0, 2, 2, 1, 2, 2, 2, 0, 1, 2, 2, 2, 2,
+ ]),
non_null_indices: (0..15).collect(),
max_def_level: 5,
max_rep_level: 2,
@@ -1300,8 +1413,8 @@ mod tests {
let logical_nulls = leaf.logical_nulls();
let expected_levels = ArrayLevels {
- def_levels: Some(vec![3, 2, 3, 1, 0, 3]),
- rep_levels: None,
+ def_levels: LevelData::Materialized(vec![3, 2, 3, 1, 0, 3]),
+ rep_levels: LevelData::Absent,
non_null_indices: vec![0, 2, 5],
max_def_level: 3,
max_rep_level: 0,
@@ -1340,8 +1453,8 @@ mod tests {
let list_level = &levels[0];
let expected_level = ArrayLevels {
- def_levels: Some(vec![0, 3, 3, 3]),
- rep_levels: Some(vec![0, 0, 1, 1]),
+ def_levels: LevelData::Materialized(vec![0, 3, 3, 3]),
+ rep_levels: LevelData::Materialized(vec![0, 0, 1, 1]),
non_null_indices: vec![3, 4, 5],
max_def_level: 3,
max_rep_level: 1,
@@ -1433,8 +1546,8 @@ mod tests {
let list_level = &levels[0];
let expected_level = ArrayLevels {
- def_levels: None,
- rep_levels: None,
+ def_levels: LevelData::Absent,
+ rep_levels: LevelData::Absent,
non_null_indices: vec![0, 1, 2, 3, 4],
max_def_level: 0,
max_rep_level: 0,
@@ -1448,8 +1561,8 @@ mod tests {
let b_logical_nulls = b.logical_nulls();
let expected_level = ArrayLevels {
- def_levels: Some(vec![1, 0, 0, 1, 1]),
- rep_levels: None,
+ def_levels: LevelData::Materialized(vec![1, 0, 0, 1, 1]),
+ rep_levels: LevelData::Absent,
non_null_indices: vec![0, 3, 4],
max_def_level: 1,
max_rep_level: 0,
@@ -1463,8 +1576,8 @@ mod tests {
let d_logical_nulls = d.logical_nulls();
let expected_level = ArrayLevels {
- def_levels: Some(vec![1, 1, 1, 2, 1]),
- rep_levels: None,
+ def_levels: LevelData::Materialized(vec![1, 1, 1, 2, 1]),
+ rep_levels: LevelData::Absent,
non_null_indices: vec![3],
max_def_level: 2,
max_rep_level: 0,
@@ -1478,8 +1591,8 @@ mod tests {
let f_logical_nulls = f.logical_nulls();
let expected_level = ArrayLevels {
- def_levels: Some(vec![3, 2, 3, 2, 3]),
- rep_levels: None,
+ def_levels: LevelData::Materialized(vec![3, 2, 3, 2, 3]),
+ rep_levels: LevelData::Absent,
non_null_indices: vec![0, 2, 4],
max_def_level: 3,
max_rep_level: 0,
@@ -1586,8 +1699,8 @@ mod tests {
let list_level = &levels[0];
let expected_level = ArrayLevels {
- def_levels: Some(vec![1; 7]),
- rep_levels: Some(vec![0, 1, 0, 1, 0, 1, 1]),
+ def_levels: LevelData::Materialized(vec![1; 7]),
+ rep_levels: LevelData::Materialized(vec![0, 1, 0, 1, 0, 1, 1]),
non_null_indices: vec![0, 1, 2, 3, 4, 5, 6],
max_def_level: 1,
max_rep_level: 1,
@@ -1601,8 +1714,8 @@ mod tests {
let map_values_logical_nulls = map.values().logical_nulls();
let expected_level = ArrayLevels {
- def_levels: Some(vec![2, 2, 2, 1, 2, 1, 2]),
- rep_levels: Some(vec![0, 1, 0, 1, 0, 1, 1]),
+ def_levels: LevelData::Materialized(vec![2, 2, 2, 1, 2, 1, 2]),
+ rep_levels: LevelData::Materialized(vec![0, 1, 0, 1, 0, 1, 1]),
non_null_indices: vec![0, 1, 2, 4, 6],
max_def_level: 2,
max_rep_level: 1,
@@ -1688,8 +1801,8 @@ mod tests {
let logical_nulls = values.logical_nulls();
let expected_level = ArrayLevels {
- def_levels: Some(vec![4, 1, 0, 2, 2, 3, 4]),
- rep_levels: Some(vec![0, 0, 0, 0, 1, 0, 0]),
+ def_levels: LevelData::Materialized(vec![4, 1, 0, 2, 2, 3, 4]),
+ rep_levels: LevelData::Materialized(vec![0, 0, 0, 0, 1, 0, 0]),
non_null_indices: vec![0, 4],
max_def_level: 4,
max_rep_level: 1,
@@ -1730,8 +1843,8 @@ mod tests {
let logical_nulls = values.logical_nulls();
let expected_level = ArrayLevels {
- def_levels: Some(vec![4, 4, 3, 2, 0, 4, 4, 0, 1]),
- rep_levels: Some(vec![0, 1, 0, 0, 0, 0, 1, 0, 0]),
+ def_levels: LevelData::Materialized(vec![4, 4, 3, 2, 0, 4, 4, 0,
1]),
+ rep_levels: LevelData::Materialized(vec![0, 1, 0, 0, 0, 0, 1, 0,
0]),
non_null_indices: vec![0, 1, 5, 6],
max_def_level: 4,
max_rep_level: 1,
@@ -1817,8 +1930,8 @@ mod tests {
let a1_logical_nulls = a1_values.logical_nulls();
let expected_level = ArrayLevels {
- def_levels: Some(vec![0, 0, 1, 6, 5, 2, 3, 1]),
- rep_levels: Some(vec![0, 0, 0, 0, 2, 0, 1, 0]),
+ def_levels: LevelData::Materialized(vec![0, 0, 1, 6, 5, 2, 3, 1]),
+ rep_levels: LevelData::Materialized(vec![0, 0, 0, 0, 2, 0, 1, 0]),
non_null_indices: vec![1],
max_def_level: 6,
max_rep_level: 2,
@@ -1830,8 +1943,8 @@ mod tests {
let a2_logical_nulls = a2_values.logical_nulls();
let expected_level = ArrayLevels {
- def_levels: Some(vec![0, 0, 1, 3, 2, 4, 1]),
- rep_levels: Some(vec![0, 0, 0, 0, 0, 1, 0]),
+ def_levels: LevelData::Materialized(vec![0, 0, 1, 3, 2, 4, 1]),
+ rep_levels: LevelData::Materialized(vec![0, 0, 0, 0, 0, 1, 0]),
non_null_indices: vec![4],
max_def_level: 4,
max_rep_level: 1,
@@ -1870,8 +1983,8 @@ mod tests {
let logical_nulls = values.logical_nulls();
let expected_level = ArrayLevels {
- def_levels: Some(vec![0, 0, 3, 3]),
- rep_levels: Some(vec![0, 0, 0, 1]),
+ def_levels: LevelData::Materialized(vec![0, 0, 3, 3]),
+ rep_levels: LevelData::Materialized(vec![0, 0, 0, 1]),
non_null_indices: vec![6, 7],
max_def_level: 3,
max_rep_level: 1,
@@ -2022,8 +2135,8 @@ mod tests {
// [[{a: 1}, null], null, [null, null], [{a: null}, {a: 2}]]
let values_a_logical_nulls = values_a.logical_nulls();
let expected_a = ArrayLevels {
- def_levels: Some(vec![4, 2, 0, 2, 2, 3, 4]),
- rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]),
+ def_levels: LevelData::Materialized(vec![4, 2, 0, 2, 2, 3, 4]),
+ rep_levels: LevelData::Materialized(vec![0, 1, 0, 0, 1, 0, 1]),
non_null_indices: vec![0, 7],
max_def_level: 4,
max_rep_level: 1,
@@ -2033,8 +2146,8 @@ mod tests {
// [[{b: 2}, null], null, [null, null], [{b: 3}, {b: 4}]]
let values_b_logical_nulls = values_b.logical_nulls();
let expected_b = ArrayLevels {
- def_levels: Some(vec![3, 2, 0, 2, 2, 3, 3]),
- rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]),
+ def_levels: LevelData::Materialized(vec![3, 2, 0, 2, 2, 3, 3]),
+ rep_levels: LevelData::Materialized(vec![0, 1, 0, 0, 1, 0, 1]),
non_null_indices: vec![0, 6, 7],
max_def_level: 3,
max_rep_level: 1,
@@ -2066,8 +2179,8 @@ mod tests {
let logical_nulls = values.logical_nulls();
let expected_level = ArrayLevels {
- def_levels: Some(vec![1, 0, 1]),
- rep_levels: Some(vec![0, 0, 0]),
+ def_levels: LevelData::Materialized(vec![1, 0, 1]),
+ rep_levels: LevelData::Materialized(vec![0, 0, 0]),
non_null_indices: vec![],
max_def_level: 3,
max_rep_level: 1,
@@ -2103,8 +2216,8 @@ mod tests {
let logical_nulls = values.logical_nulls();
let expected_level = ArrayLevels {
- def_levels: Some(vec![5, 4, 5, 2, 5, 3, 5, 5, 4, 4, 0]),
- rep_levels: Some(vec![0, 2, 2, 1, 0, 1, 0, 2, 1, 2, 0]),
+ def_levels: LevelData::Materialized(vec![5, 4, 5, 2, 5, 3, 5, 5,
4, 4, 0]),
+ rep_levels: LevelData::Materialized(vec![0, 2, 2, 1, 0, 1, 0, 2,
1, 2, 0]),
non_null_indices: vec![0, 2, 3, 4, 5],
max_def_level: 5,
max_rep_level: 2,
@@ -2136,8 +2249,8 @@ mod tests {
let logical_nulls = dict.logical_nulls();
let expected_level = ArrayLevels {
- def_levels: Some(vec![0, 0, 1, 1]),
- rep_levels: None,
+ def_levels: LevelData::Materialized(vec![0, 0, 1, 1]),
+ rep_levels: LevelData::Absent,
non_null_indices: vec![2, 3],
max_def_level: 1,
max_rep_level: 0,
@@ -2176,8 +2289,8 @@ mod tests {
let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5,
6]));
let logical_nulls = array.logical_nulls();
let levels = ArrayLevels {
- def_levels: None,
- rep_levels: None,
+ def_levels: LevelData::Absent,
+ rep_levels: LevelData::Absent,
non_null_indices: vec![0, 1, 2, 3, 4, 5],
max_def_level: 0,
max_rep_level: 0,
@@ -2190,8 +2303,8 @@ mod tests {
value_offset: 2,
num_values: 3,
});
- assert!(sliced.def_levels.is_none());
- assert!(sliced.rep_levels.is_none());
+ assert!(matches!(sliced.def_levels, LevelData::Absent));
+ assert!(matches!(sliced.rep_levels, LevelData::Absent));
assert_eq!(sliced.non_null_indices, vec![0, 1, 2]);
assert_eq!(sliced.array.len(), 3);
@@ -2210,8 +2323,8 @@ mod tests {
]));
let logical_nulls = array.logical_nulls();
let levels = ArrayLevels {
- def_levels: Some(vec![1, 0, 1, 0, 1, 1]),
- rep_levels: None,
+ def_levels: LevelData::Materialized(vec![1, 0, 1, 0, 1, 1]),
+ rep_levels: LevelData::Absent,
non_null_indices: vec![0, 2, 4, 5],
max_def_level: 1,
max_rep_level: 0,
@@ -2224,8 +2337,8 @@ mod tests {
value_offset: 1,
num_values: 1,
});
- assert_eq!(sliced.def_levels, Some(vec![0, 1, 0]));
- assert!(sliced.rep_levels.is_none());
+ assert_eq!(sliced.def_levels, LevelData::Materialized(vec![0, 1, 0]));
+ assert!(matches!(sliced.rep_levels, LevelData::Absent));
assert_eq!(sliced.non_null_indices, vec![0]); // [2] shifted by -2
(nni[0])
assert_eq!(sliced.array.len(), 1);
}
@@ -2264,8 +2377,8 @@ mod tests {
]));
let logical_nulls = array.logical_nulls();
let levels = ArrayLevels {
- def_levels: Some(vec![3, 0, 3, 2, 0, 3, 3]),
- rep_levels: Some(vec![0, 0, 0, 1, 0, 0, 1]),
+ def_levels: LevelData::Materialized(vec![3, 0, 3, 2, 0, 3, 3]),
+ rep_levels: LevelData::Materialized(vec![0, 0, 0, 1, 0, 0, 1]),
non_null_indices: vec![0, 3, 8, 9],
max_def_level: 3,
max_rep_level: 1,
@@ -2310,8 +2423,8 @@ mod tests {
let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None,
None, Some(4)]));
let logical_nulls = array.logical_nulls();
let levels = ArrayLevels {
- def_levels: Some(vec![1, 0, 0, 1]),
- rep_levels: None,
+ def_levels: LevelData::Materialized(vec![1, 0, 0, 1]),
+ rep_levels: LevelData::Absent,
non_null_indices: vec![0, 3],
max_def_level: 1,
max_rep_level: 0,
@@ -2325,7 +2438,7 @@ mod tests {
value_offset: 1,
num_values: 0,
});
- assert_eq!(sliced.def_levels, Some(vec![0, 0]));
+ assert_eq!(sliced.def_levels, LevelData::Materialized(vec![0, 0]));
assert_eq!(sliced.non_null_indices, Vec::<usize>::new());
assert_eq!(sliced.array.len(), 0);
}
diff --git a/parquet/src/arrow/arrow_writer/mod.rs
b/parquet/src/arrow/arrow_writer/mod.rs
index 641b81257a..b93e174401 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -920,8 +920,11 @@ impl ArrowColumnWriter {
chunker: &mut ContentDefinedChunker,
) -> Result<()> {
let levels = &col.0;
- let chunks =
- chunker.get_arrow_chunks(levels.def_levels(), levels.rep_levels(),
levels.array())?;
+ let chunks = chunker.get_arrow_chunks(
+ levels.def_level_data().as_ref(),
+ levels.rep_level_data().as_ref(),
+ levels.array(),
+ )?;
let num_chunks = chunks.len();
for (i, chunk) in chunks.iter().enumerate() {
@@ -1384,10 +1387,15 @@ fn write_leaf(
}
ColumnWriter::BoolColumnWriter(typed) => {
let array = column.as_boolean();
- typed.write_batch(
- get_bool_array_slice(array, indices).as_slice(),
- levels.def_levels(),
- levels.rep_levels(),
+ let values = get_bool_array_slice(array, indices);
+ typed.write_batch_internal(
+ values.as_slice(),
+ None,
+ levels.def_level_data().as_ref(),
+ levels.rep_level_data().as_ref(),
+ None,
+ None,
+ None,
)
}
ColumnWriter::Int64ColumnWriter(typed) => {
@@ -1538,7 +1546,15 @@ fn write_leaf(
));
}
};
- typed.write_batch(bytes.as_slice(), levels.def_levels(),
levels.rep_levels())
+ typed.write_batch_internal(
+ bytes.as_slice(),
+ None,
+ levels.def_level_data().as_ref(),
+ levels.rep_level_data().as_ref(),
+ None,
+ None,
+ None,
+ )
}
}
}
@@ -1551,8 +1567,8 @@ fn write_primitive<E: ColumnValueEncoder>(
writer.write_batch_internal(
values,
Some(levels.non_null_indices()),
- levels.def_levels(),
- levels.rep_levels(),
+ levels.def_level_data().as_ref(),
+ levels.rep_level_data().as_ref(),
None,
None,
None,
diff --git a/parquet/src/column/chunker/cdc.rs
b/parquet/src/column/chunker/cdc.rs
index cb4328536d..b40dd74a8d 100644
--- a/parquet/src/column/chunker/cdc.rs
+++ b/parquet/src/column/chunker/cdc.rs
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+#[cfg(feature = "arrow")]
+use crate::column::writer::LevelDataRef;
use crate::errors::{ParquetError, Result};
use crate::file::properties::CdcOptions;
use crate::schema::types::ColumnDescriptor;
@@ -275,8 +277,8 @@ impl ContentDefinedChunker {
/// evaluate if we need to create a new chunk.
fn calculate<F>(
&mut self,
- def_levels: Option<&[i16]>,
- rep_levels: Option<&[i16]>,
+ def_levels: LevelDataRef<'_>,
+ rep_levels: LevelDataRef<'_>,
num_levels: usize,
mut roll_value: F,
) -> Vec<CdcChunk>
@@ -322,10 +324,11 @@ impl ContentDefinedChunker {
// def_levels: [1, 0, 1, 0, 1]
// level: 0 1 2 3 4
// value_offset: 0 1 2 (only increments on def==1)
- let def_levels = def_levels.expect("def_levels required when
max_def_level > 0");
#[allow(clippy::needless_range_loop)]
for offset in 0..num_levels {
- let def_level = def_levels[offset];
+ let def_level = def_levels
+ .value_at(offset)
+ .expect("def_levels required when max_def_level > 0");
self.roll_level(def_level);
if def_level == self.max_def_level {
// For non-nested data, the leaf array has one slot per
@@ -380,13 +383,15 @@ impl ContentDefinedChunker {
// Using value_offset=1 would index position 1 (the null slot).
//
// Using value_offset for roll_value would hash the wrong array
slot.
- let def_levels = def_levels.expect("def_levels required for nested
data");
- let rep_levels = rep_levels.expect("rep_levels required for nested
data");
let mut leaf_offset: usize = 0;
for offset in 0..num_levels {
- let def_level = def_levels[offset];
- let rep_level = rep_levels[offset];
+ let def_level = def_levels
+ .value_at(offset)
+ .expect("def_levels required for nested data");
+ let rep_level = rep_levels
+ .value_at(offset)
+ .expect("rep_levels required for nested data");
self.roll_level(def_level);
self.roll_level(rep_level);
@@ -439,16 +444,20 @@ impl ContentDefinedChunker {
#[cfg(feature = "arrow")]
pub(crate) fn get_arrow_chunks(
&mut self,
- def_levels: Option<&[i16]>,
- rep_levels: Option<&[i16]>,
+ def_levels: LevelDataRef<'_>,
+ rep_levels: LevelDataRef<'_>,
array: &dyn arrow_array::Array,
) -> Result<Vec<CdcChunk>> {
use arrow_array::cast::AsArray;
use arrow_schema::DataType;
- let num_levels = match def_levels {
- Some(def_levels) => def_levels.len(),
- None => array.len(),
+ // For nested (list) data, null list entries can own non-zero child
+ // ranges in the leaf array, so `array.len()` may exceed the number of
+ // levels. Always drive the loop by the level count; fall back to the
+ // array length only when there are no levels at all.
+ let num_levels = match (def_levels.len(), rep_levels.len()) {
+ (0, 0) => array.len(),
+ (d, r) => d.max(r),
};
macro_rules! fixed_width {
@@ -566,6 +575,8 @@ impl ContentDefinedChunker {
mod tests {
use super::*;
use crate::basic::Type as PhysicalType;
+ #[cfg(feature = "arrow")]
+ use crate::column::writer::LevelDataRef;
use crate::schema::types::{ColumnPath, Type};
use std::sync::Arc;
@@ -615,9 +626,14 @@ mod tests {
// Write a small amount of data — should produce exactly 1 chunk.
let num_values = 4;
- let chunks = chunker.calculate(None, None, num_values, |c, i| {
- c.roll_fixed::<4>(&(i as i32).to_le_bytes());
- });
+ let chunks = chunker.calculate(
+ LevelDataRef::Absent,
+ LevelDataRef::Absent,
+ num_values,
+ |c, i| {
+ c.roll_fixed::<4>(&(i as i32).to_le_bytes());
+ },
+ );
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0].level_offset, 0);
assert_eq!(chunks[0].value_offset, 0);
@@ -636,9 +652,14 @@ mod tests {
// Write enough data to exceed max_chunk_size multiple times.
// Each i32 = 4 bytes, max_chunk_size=1024, so ~256 values per chunk
max.
let num_values = 2000;
- let chunks = chunker.calculate(None, None, num_values, |c, i| {
- c.roll_fixed::<4>(&(i as i32).to_le_bytes());
- });
+ let chunks = chunker.calculate(
+ LevelDataRef::Absent,
+ LevelDataRef::Absent,
+ num_values,
+ |c, i| {
+ c.roll_fixed::<4>(&(i as i32).to_le_bytes());
+ },
+ );
// Should have multiple chunks
assert!(chunks.len() > 1);
@@ -668,10 +689,10 @@ mod tests {
};
let mut chunker1 = ContentDefinedChunker::new(&make_desc(0, 0),
&options).unwrap();
- let chunks1 = chunker1.calculate(None, None, 200, roll);
+ let chunks1 = chunker1.calculate(LevelDataRef::Absent,
LevelDataRef::Absent, 200, roll);
let mut chunker2 = ContentDefinedChunker::new(&make_desc(0, 0),
&options).unwrap();
- let chunks2 = chunker2.calculate(None, None, 200, roll);
+ let chunks2 = chunker2.calculate(LevelDataRef::Absent,
LevelDataRef::Absent, 200, roll);
assert_eq!(chunks1.len(), chunks2.len());
for (a, b) in chunks1.iter().zip(chunks2.iter()) {
@@ -699,9 +720,14 @@ mod tests {
.collect();
let expected_non_null: usize = def_levels.iter().filter(|&&d| d ==
1).count();
- let chunks = chunker.calculate(Some(&def_levels), None, num_levels,
|c, i| {
- c.roll_fixed::<4>(&(i as i32).to_le_bytes());
- });
+ let chunks = chunker.calculate(
+ LevelDataRef::Materialized(&def_levels),
+ LevelDataRef::Absent,
+ num_levels,
+ |c, i| {
+ c.roll_fixed::<4>(&(i as i32).to_le_bytes());
+ },
+ );
assert!(!chunks.is_empty());
let total_levels: usize = chunks.iter().map(|c| c.num_levels).sum();
@@ -729,6 +755,7 @@ mod arrow_tests {
use crate::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use crate::arrow::arrow_writer::ArrowWriter;
+ use crate::column::writer::LevelDataRef;
use crate::file::properties::{CdcOptions, WriterProperties};
use crate::file::reader::{FileReader, SerializedFileReader};
@@ -1109,6 +1136,27 @@ mod arrow_tests {
buf
}
+ #[test]
+ fn cdc_all_null_arrow_column_writes_data_pages() {
+ let array = Arc::new(Int32Array::from(vec![None::<i32>; 4096])) as
ArrayRef;
+ let schema = Arc::new(Schema::new(vec![Field::new("f0",
DataType::Int32, true)]));
+ let batch = RecordBatch::try_new(schema, vec![array.clone()]).unwrap();
+
+ let data = write_with_cdc_options(&[&batch], 64, 256, Some(4096),
false);
+ let info = get_column_info(&data, 0);
+
+ assert_eq!(info.len(), 1);
+ assert!(
+ !info[0].page_lengths.is_empty(),
+ "all-null CDC write should still emit data pages"
+ );
+ assert_eq!(
+ info[0].page_lengths.iter().sum::<i64>(),
+ array.len() as i64,
+ "all-null CDC pages should account for every input row"
+ );
+ }
+
fn read_batches(data: &[u8]) -> Vec<RecordBatch> {
let reader =
ParquetRecordBatchReaderBuilder::try_new(bytes::Bytes::from(data.to_vec()))
.unwrap()
@@ -2190,11 +2238,15 @@ mod arrow_tests {
let array: Int32Array = (0..n).map(|i| test_hash(0, i as u64) as
i32).collect();
let mut chunker = super::ContentDefinedChunker::new(&desc,
&options).unwrap();
- let chunks = chunker.get_arrow_chunks(None, None, &array).unwrap();
+ let chunks = chunker
+ .get_arrow_chunks(LevelDataRef::Absent, LevelDataRef::Absent,
&array)
+ .unwrap();
let sliced = array.slice(offset, n - offset);
let mut chunker2 = super::ContentDefinedChunker::new(&desc,
&options).unwrap();
- let chunks2 = chunker2.get_arrow_chunks(None, None, &sliced).unwrap();
+ let chunks2 = chunker2
+ .get_arrow_chunks(LevelDataRef::Absent, LevelDataRef::Absent,
&sliced)
+ .unwrap();
let values: Vec<usize> = chunks.iter().map(|c| c.num_values).collect();
let values2: Vec<usize> = chunks2.iter().map(|c|
c.num_values).collect();
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index f755beed55..5d14ac6856 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -314,6 +314,67 @@ impl<T: Default> ColumnMetrics<T> {
}
}
+/// Borrowed view of level data, analogous to `&str` for `LevelData`'s
`String`.
+///
+/// `LevelDataRef` can be constructed from `LevelData` and directly from an
existing
+/// `&[i16]` without allocating.
+///
+/// The variants are different physical representations of the same logical
+/// sequence of levels.
+#[derive(Debug, Clone, Copy)]
+pub(crate) enum LevelDataRef<'a> {
+ Absent,
+ Materialized(&'a [i16]),
+ Uniform { value: i16, count: usize },
+}
+
+impl<'a> From<&'a [i16]> for LevelDataRef<'a> {
+ fn from(levels: &'a [i16]) -> Self {
+ Self::Materialized(levels)
+ }
+}
+
+impl<'a> From<Option<&'a [i16]>> for LevelDataRef<'a> {
+ fn from(levels: Option<&'a [i16]>) -> Self {
+ levels.map_or(Self::Absent, Self::from)
+ }
+}
+
+impl<'a> LevelDataRef<'a> {
+ pub(crate) fn len(self) -> usize {
+ match self {
+ Self::Absent => 0,
+ Self::Materialized(values) => values.len(),
+ Self::Uniform { count, .. } => count,
+ }
+ }
+
+ pub(crate) fn first(self) -> Option<i16> {
+ match self {
+ Self::Absent => None,
+ Self::Materialized(values) => values.first().copied(),
+ Self::Uniform { value, count } => (count > 0).then_some(value),
+ }
+ }
+
+ #[cfg(feature = "arrow")]
+ pub(crate) fn value_at(self, idx: usize) -> Option<i16> {
+ match self {
+ Self::Absent => None,
+ Self::Materialized(values) => values.get(idx).copied(),
+ Self::Uniform { value, count } => (idx < count).then_some(value),
+ }
+ }
+
+ pub(crate) fn slice(self, offset: usize, len: usize) -> Self {
+ match self {
+ Self::Absent => Self::Absent,
+ Self::Materialized(values) =>
Self::Materialized(&values[offset..offset + len]),
+ Self::Uniform { value, .. } => Self::Uniform { value, count: len },
+ }
+ }
+}
+
/// Typed column writer for a primitive column.
pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a,
ColumnValueEncoderImpl<T>>;
@@ -424,21 +485,19 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a,
E> {
&mut self,
values: &E::Values,
value_indices: Option<&[usize]>,
- def_levels: Option<&[i16]>,
- rep_levels: Option<&[i16]>,
+ def_levels: LevelDataRef<'_>,
+ rep_levels: LevelDataRef<'_>,
min: Option<&E::T>,
max: Option<&E::T>,
distinct_count: Option<u64>,
) -> Result<usize> {
// Check if number of definition levels is the same as number of
repetition levels.
- if let (Some(def), Some(rep)) = (def_levels, rep_levels) {
- if def.len() != rep.len() {
- return Err(general_err!(
- "Inconsistent length of definition and repetition levels:
{} != {}",
- def.len(),
- rep.len()
- ));
- }
+ if def_levels.len() != 0 && rep_levels.len() != 0 && def_levels.len()
!= rep_levels.len() {
+ return Err(general_err!(
+ "Inconsistent length of definition and repetition levels: {}
!= {}",
+ def_levels.len(),
+ rep_levels.len()
+ ));
}
// We check for DataPage limits only after we have inserted the
values. If a user
@@ -450,10 +509,11 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a,
E> {
// TODO: find out why we don't account for size of levels when we
estimate page
// size.
-
- let num_levels = match def_levels {
- Some(def_levels) => def_levels.len(),
- None => values.len(),
+ let num_levels = def_levels.len().max(rep_levels.len());
+ let num_levels = if num_levels > 0 {
+ num_levels
+ } else {
+ value_indices.map_or(values.len(), |i| i.len())
};
if let Some(min) = min {
@@ -472,13 +532,24 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a,
E> {
let mut values_offset = 0;
let mut levels_offset = 0;
- let base_batch_size = self.props.write_batch_size();
+ let both_levels_compact = !matches!(def_levels,
LevelDataRef::Materialized(_))
+ && !matches!(rep_levels, LevelDataRef::Materialized(_));
+ let has_levels = !matches!(def_levels, LevelDataRef::Absent)
+ || !matches!(rep_levels, LevelDataRef::Absent);
+ // When both level vectors are compact (Uniform or Absent), there is no
+ // materialized slice to split and the per-mini-batch work is O(1), so
we
+ // can safely use a much larger batch size.
+ let base_batch_size = if both_levels_compact && has_levels {
+ self.props.data_page_row_count_limit()
+ } else {
+ self.props.write_batch_size()
+ };
while levels_offset < num_levels {
let mut end_offset = num_levels.min(levels_offset +
base_batch_size);
// Split at record boundary
- if let Some(r) = rep_levels {
- while end_offset < r.len() && r[end_offset] != 0 {
+ if let LevelDataRef::Materialized(levels) = rep_levels {
+ while end_offset < levels.len() && levels[end_offset] != 0 {
end_offset += 1;
}
}
@@ -488,8 +559,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
values_offset,
value_indices,
end_offset - levels_offset,
- def_levels.map(|lv| &lv[levels_offset..end_offset]),
- rep_levels.map(|lv| &lv[levels_offset..end_offset]),
+ def_levels.slice(levels_offset, end_offset - levels_offset),
+ rep_levels.slice(levels_offset, end_offset - levels_offset),
)?;
levels_offset = end_offset;
}
@@ -516,7 +587,15 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E>
{
def_levels: Option<&[i16]>,
rep_levels: Option<&[i16]>,
) -> Result<usize> {
- self.write_batch_internal(values, None, def_levels, rep_levels, None,
None, None)
+ self.write_batch_internal(
+ values,
+ None,
+ LevelDataRef::from(def_levels),
+ LevelDataRef::from(rep_levels),
+ None,
+ None,
+ None,
+ )
}
/// Writer may optionally provide pre-calculated statistics for use when
computing
@@ -538,8 +617,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
self.write_batch_internal(
values,
None,
- def_levels,
- rep_levels,
+ LevelDataRef::from(def_levels),
+ LevelDataRef::from(rep_levels),
min,
max,
distinct_count,
@@ -650,32 +729,53 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a,
E> {
values_offset: usize,
value_indices: Option<&[usize]>,
num_levels: usize,
- def_levels: Option<&[i16]>,
- rep_levels: Option<&[i16]>,
+ def_levels: LevelDataRef<'_>,
+ rep_levels: LevelDataRef<'_>,
) -> Result<usize> {
// Process definition levels and determine how many values to write.
let values_to_write = if self.descr.max_def_level() > 0 {
- let levels = def_levels.ok_or_else(|| {
- general_err!(
- "Definition levels are required, because max definition
level = {}",
- self.descr.max_def_level()
- )
- })?;
-
- let mut values_to_write = 0usize;
let max_def = self.descr.max_def_level();
- let encoder = &mut self.def_levels_encoder;
- match self.page_metrics.definition_level_histogram.as_mut() {
- Some(histogram) => encoder.put_with_observer(levels, |level,
count| {
- values_to_write += count * (level == max_def) as usize;
- histogram.increment_by(level, count as i64);
- }),
- None => encoder.put_with_observer(levels, |level, count| {
- values_to_write += count * (level == max_def) as usize;
- }),
- };
- self.page_metrics.num_page_nulls += (levels.len() -
values_to_write) as u64;
- values_to_write
+ match def_levels {
+ LevelDataRef::Absent => {
+ return Err(general_err!(
+ "Definition levels are required, because max
definition level = {}",
+ self.descr.max_def_level()
+ ));
+ }
+ LevelDataRef::Materialized(levels) => {
+ // General path for caller-provided or already-materialized
+ // level buffers.
+ let mut values_to_write = 0usize;
+ let encoder = &mut self.def_levels_encoder;
+ match
self.page_metrics.definition_level_histogram.as_mut() {
+ Some(histogram) => encoder.put_with_observer(levels,
|level, count| {
+ values_to_write += count * (level == max_def) as
usize;
+ histogram.increment_by(level, count as i64);
+ }),
+ None => encoder.put_with_observer(levels, |level,
count| {
+ values_to_write += count * (level == max_def) as
usize;
+ }),
+ };
+ self.page_metrics.num_page_nulls += (levels.len() -
values_to_write) as u64;
+ values_to_write
+ }
+ LevelDataRef::Uniform { value, count } => {
+ // Fast path for all-null, all-valid, or otherwise uniform
+ // definition levels without materializing a level buffer.
+ let encoder = &mut self.def_levels_encoder;
+ match
self.page_metrics.definition_level_histogram.as_mut() {
+ Some(histogram) => {
+ encoder.put_n_with_observer(value, count, |level,
run_len| {
+ histogram.increment_by(level, run_len as i64);
+ })
+ }
+ None => encoder.put_n_with_observer(value, count, |_,
_| {}),
+ };
+ let values_to_write = count * (value == max_def) as usize;
+ self.page_metrics.num_page_nulls += (count -
values_to_write) as u64;
+ values_to_write
+ }
+ }
} else {
num_levels
};
@@ -683,31 +783,50 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a,
E> {
// Process repetition levels and determine how many rows we are about
to process.
if self.descr.max_rep_level() > 0 {
// A row could contain more than one value.
- let levels = rep_levels.ok_or_else(|| {
+ let first_level = rep_levels.first().ok_or_else(|| {
general_err!(
"Repetition levels are required, because max repetition
level = {}",
self.descr.max_rep_level()
)
})?;
- if !levels.is_empty() && levels[0] != 0 {
+ if first_level != 0 {
return Err(general_err!(
"Write must start at a record boundary, got non-zero
repetition level of {}",
- levels[0]
+ first_level
));
}
let mut new_rows = 0u32;
- let encoder = &mut self.rep_levels_encoder;
- match self.page_metrics.repetition_level_histogram.as_mut() {
- Some(histogram) => encoder.put_with_observer(levels, |level,
count| {
- new_rows += (count as u32) * (level == 0) as u32;
- histogram.increment_by(level, count as i64);
- }),
- None => encoder.put_with_observer(levels, |level, count| {
- new_rows += (count as u32) * (level == 0) as u32;
- }),
- };
+ match rep_levels {
+ LevelDataRef::Absent => unreachable!(),
+ LevelDataRef::Materialized(levels) => {
+ let encoder = &mut self.rep_levels_encoder;
+ match
self.page_metrics.repetition_level_histogram.as_mut() {
+ Some(histogram) => encoder.put_with_observer(levels,
|level, count| {
+ new_rows += (count as u32) * (level == 0) as u32;
+ histogram.increment_by(level, count as i64);
+ }),
+ None => encoder.put_with_observer(levels, |level,
count| {
+ new_rows += (count as u32) * (level == 0) as u32;
+ }),
+ };
+ }
+ LevelDataRef::Uniform { value, count } => {
+ let encoder = &mut self.rep_levels_encoder;
+ match
self.page_metrics.repetition_level_histogram.as_mut() {
+ Some(histogram) => {
+ encoder.put_n_with_observer(value, count, |level,
run_len| {
+ new_rows += (run_len as u32) * (level == 0) as
u32;
+ histogram.increment_by(level, run_len as i64);
+ })
+ }
+ None => encoder.put_n_with_observer(value, count,
|level, run_len| {
+ new_rows += (run_len as u32) * (level == 0) as u32;
+ }),
+ };
+ }
+ }
self.page_metrics.num_buffered_rows += new_rows;
} else {
// Each value is exactly one row.
@@ -4451,4 +4570,221 @@ mod tests {
result.metadata.compressed_size()
);
}
+
+ struct ColumnRoundTripUniform<'a, T: DataType> {
+ props: WriterProperties,
+ values: &'a [T::T],
+ def_levels: LevelDataRef<'a>,
+ rep_levels: LevelDataRef<'a>,
+ max_def_level: i16,
+ max_rep_level: i16,
+ expected_values: &'a [T::T],
+ expected_def_levels: Option<&'a [i16]>,
+ expected_rep_levels: Option<&'a [i16]>,
+ }
+
+ impl<'a, T: DataType> ColumnRoundTripUniform<'a, T>
+ where
+ T::T: PartialEq + std::fmt::Debug,
+ {
+ fn new() -> Self {
+ Self {
+ props: Default::default(),
+ values: &[],
+ def_levels: LevelDataRef::Absent,
+ rep_levels: LevelDataRef::Absent,
+ max_def_level: 0,
+ max_rep_level: 0,
+ expected_values: &[],
+ expected_def_levels: None,
+ expected_rep_levels: None,
+ }
+ }
+
+ fn with_props(mut self, props: WriterProperties) -> Self {
+ self.props = props;
+ self
+ }
+
+ fn with_values(mut self, values: &'a [T::T]) -> Self {
+ self.values = values;
+ self
+ }
+
+ fn with_def_levels(mut self, def_levels: LevelDataRef<'a>) -> Self {
+ self.def_levels = def_levels;
+ self
+ }
+
+ fn with_rep_levels(mut self, rep_levels: LevelDataRef<'a>) -> Self {
+ self.rep_levels = rep_levels;
+ self
+ }
+
+ fn with_max_def_level(mut self, max_def_level: i16) -> Self {
+ self.max_def_level = max_def_level;
+ self
+ }
+
+ fn with_max_rep_level(mut self, max_rep_level: i16) -> Self {
+ self.max_rep_level = max_rep_level;
+ self
+ }
+
+ fn with_expected_values(mut self, expected_values: &'a [T::T]) -> Self
{
+ self.expected_values = expected_values;
+ self
+ }
+
+ fn with_expected_def_levels(mut self, expected_def_levels: &'a [i16])
-> Self {
+ self.expected_def_levels = Some(expected_def_levels);
+ self
+ }
+
+ fn with_expected_rep_levels(mut self, expected_rep_levels: &'a [i16])
-> Self {
+ self.expected_rep_levels = Some(expected_rep_levels);
+ self
+ }
+
+ /// Write-then-read roundtrip using `write_batch_internal` with the
given
+ /// [`LevelDataRef`] variants, and assert the read-back matches
`expected_*`.
+ fn run(self) {
+ let mut file = tempfile::tempfile().unwrap();
+ let mut write = TrackedWrite::new(&mut file);
+ let page_writer = Box::new(SerializedPageWriter::new(&mut write));
+ let mut writer = get_test_column_writer::<T>(
+ page_writer,
+ self.max_def_level,
+ self.max_rep_level,
+ Arc::new(self.props),
+ );
+
+ writer
+ .write_batch_internal(
+ self.values,
+ None,
+ self.def_levels,
+ self.rep_levels,
+ None,
+ None,
+ None,
+ )
+ .unwrap();
+ let result = writer.close().unwrap();
+ drop(write);
+
+ let props = ReaderProperties::builder()
+ .set_backward_compatible_lz4(false)
+ .build();
+ let page_reader = Box::new(
+ SerializedPageReader::new_with_properties(
+ Arc::new(file),
+ &result.metadata,
+ result.rows_written as usize,
+ None,
+ Arc::new(props),
+ )
+ .unwrap(),
+ );
+ let mut reader =
+ get_test_column_reader::<T>(page_reader, self.max_def_level,
self.max_rep_level);
+
+ let batch_size = self
+ .expected_def_levels
+ .map_or(self.expected_values.len(), |l| l.len());
+ let mut actual_values = Vec::with_capacity(batch_size);
+ let mut actual_def = self
+ .expected_def_levels
+ .map(|_| Vec::with_capacity(batch_size));
+ let mut actual_rep = self
+ .expected_rep_levels
+ .map(|_| Vec::with_capacity(batch_size));
+
+ let (_, values_read, levels_read) = reader
+ .read_records(
+ batch_size,
+ actual_def.as_mut(),
+ actual_rep.as_mut(),
+ &mut actual_values,
+ )
+ .unwrap();
+
+ assert_eq!(&actual_values[..values_read], self.expected_values);
+ if let Some(ref v) = actual_def {
+ assert_eq!(&v[..levels_read],
self.expected_def_levels.unwrap());
+ }
+ if let Some(ref v) = actual_rep {
+ assert_eq!(&v[..levels_read],
self.expected_rep_levels.unwrap());
+ }
+ }
+ }
+
+ #[test]
+ fn test_uniform_def_levels_all_null() {
+ // All-null column: def_level=0 (null) for every slot, no values
written.
+ let max_def_level = 1;
+ let count = 100;
+ let expected_def_levels = vec![0i16; count];
+ ColumnRoundTripUniform::<Int32Type>::new()
+ .with_def_levels(LevelDataRef::Uniform { value: 0, count })
+ .with_max_def_level(max_def_level)
+ .with_expected_def_levels(&expected_def_levels)
+ .run();
+ }
+
+ #[test]
+ fn test_uniform_def_levels_all_valid() {
+ // All-valid column: def_level=max for every slot, all values written.
+ let max_def_level = 1;
+ let values: Vec<i32> = (0..50).collect();
+ let expected_def_levels = vec![max_def_level; values.len()];
+ ColumnRoundTripUniform::<Int32Type>::new()
+ .with_values(&values)
+ .with_def_levels(LevelDataRef::Uniform {
+ value: max_def_level,
+ count: values.len(),
+ })
+ .with_max_def_level(max_def_level)
+ .with_expected_values(&values)
+ .with_expected_def_levels(&expected_def_levels)
+ .run();
+ }
+
+ #[test]
+ fn test_uniform_def_and_rep_levels() {
+ // Simulates a list column where every row is null:
+ // def=0, rep=0 for each row (one row = one entry with no child
values).
+ let max_def_level = 2;
+ let max_rep_level = 1;
+ let count = 200;
+ let expected_def_levels = vec![0i16; count];
+ let expected_rep_levels = vec![0i16; count];
+ ColumnRoundTripUniform::<Int32Type>::new()
+ .with_def_levels(LevelDataRef::Uniform { value: 0, count })
+ .with_rep_levels(LevelDataRef::Uniform { value: 0, count })
+ .with_max_def_level(max_def_level)
+ .with_max_rep_level(max_rep_level)
+ .with_expected_def_levels(&expected_def_levels)
+ .with_expected_rep_levels(&expected_rep_levels)
+ .run();
+ }
+
+ #[test]
+ fn test_uniform_levels_v1_and_v2() {
+ // Verify uniform levels work identically for both Parquet writer
versions.
+ for version in [WriterVersion::PARQUET_1_0,
WriterVersion::PARQUET_2_0] {
+ let props = WriterProperties::builder()
+ .set_writer_version(version)
+ .build();
+ let max_def = 1;
+ let count = 100;
+ let expected_def_levels = vec![0i16; count];
+ ColumnRoundTripUniform::<Int32Type>::new()
+ .with_props(props)
+ .with_def_levels(LevelDataRef::Uniform { value: 0, count })
+ .with_max_def_level(max_def)
+ .with_expected_def_levels(&expected_def_levels)
+ .run();
+ }
+ }
}
diff --git a/parquet/src/encodings/levels.rs b/parquet/src/encodings/levels.rs
index 6b6de8b0e1..841afd8b31 100644
--- a/parquet/src/encodings/levels.rs
+++ b/parquet/src/encodings/levels.rs
@@ -92,6 +92,37 @@ impl LevelEncoder {
}
}
+ /// Encode `count` repetitions of a single level value, calling
+ /// `observer(value, count)` exactly once.
+ ///
+ /// This can be used to encode uniform runs without allocating a level
+ /// buffer. Like [`Self::put_with_observer`], but specialized for a single
+ /// repeated level value.
+ ///
+ /// This is O(1) amortized for RLE-based encoders (after a small warmup).
+ #[inline]
+ pub fn put_n_with_observer<F>(&mut self, value: i16, count: usize, mut
observer: F)
+ where
+ F: FnMut(i16, usize),
+ {
+ let encoder = match *self {
+ LevelEncoder::Rle(ref mut encoder) | LevelEncoder::RleV2(ref mut
encoder) => encoder,
+ };
+
+ // Feed values individually until the encoder enters RLE accumulation
+ // mode for this value, or until we've encoded everything.
+ let mut remaining = count;
+ while remaining > 0 && !encoder.is_accumulating_rle(value as u64) {
+ encoder.put(value as u64);
+ remaining -= 1;
+ }
+ // If we're now in accumulation mode, bulk-extend the rest.
+ if remaining > 0 {
+ encoder.extend_run(remaining);
+ }
+ observer(value, count);
+ }
+
/// Finalizes level encoder, flush all intermediate buffers and return
resulting
/// encoded buffer. Returned buffer is already truncated to encoded bytes
only.
#[inline]
@@ -139,3 +170,116 @@ impl LevelEncoder {
result
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ /// Encode `count` repetitions of `value` using `put_with_observer` and
+ /// return the raw encoded bytes.
+ fn reference_encode(max_level: i16, values: &[i16]) -> Vec<u8> {
+ let mut enc = LevelEncoder::v2_streaming(max_level);
+ enc.put_with_observer(values, |_, _| {});
+ enc.consume()
+ }
+
+ #[test]
+ fn test_put_n_with_observer_large_run() {
+ // Large count exercises the bulk extend_run path (past the 8-value
warmup).
+ let max_level = 3;
+ let count = 10_000;
+ let mut enc = LevelEncoder::v2_streaming(max_level);
+ enc.put_n_with_observer(2, count, |_, _| {});
+ assert_eq!(enc.consume(), reference_encode(max_level, &vec![2;
count]));
+ }
+
+ #[test]
+ fn test_put_n_with_observer_small_count() {
+ // Count smaller than the RLE warmup threshold — only the per-element
loop runs.
+ let max_level = 3;
+ let mut enc = LevelEncoder::v2_streaming(max_level);
+ enc.put_n_with_observer(1, 5, |_, _| {});
+ assert_eq!(enc.consume(), reference_encode(max_level, &[1; 5]));
+ }
+
+ #[test]
+ fn test_put_n_with_observer_exact_threshold() {
+ // Exactly 8 values: the warmup loop completes and extend_run gets 0.
+ let max_level = 3;
+ let mut enc = LevelEncoder::v2_streaming(max_level);
+ enc.put_n_with_observer(3, 8, |_, _| {});
+ assert_eq!(enc.consume(), reference_encode(max_level, &[3; 8]));
+ }
+
+ #[test]
+ fn test_put_n_with_observer_single_value() {
+ let max_level = 1;
+ let mut enc = LevelEncoder::v2_streaming(max_level);
+ enc.put_n_with_observer(1, 1, |_, _| {});
+ assert_eq!(enc.consume(), reference_encode(max_level, &[1]));
+ }
+
+ #[test]
+ fn test_put_n_with_observer_zero_count() {
+ let max_level = 3;
+ let mut enc = LevelEncoder::v2_streaming(max_level);
+ enc.put_n_with_observer(2, 0, |_, _| {});
+ assert_eq!(enc.consume(), reference_encode(max_level, &[]));
+ }
+
+ #[test]
+ fn test_put_n_with_observer_calls_observer_exactly_once() {
+ let mut enc = LevelEncoder::v2_streaming(3);
+ let mut calls: Vec<(i16, usize)> = Vec::new();
+ enc.put_n_with_observer(2, 500, |val, cnt| calls.push((val, cnt)));
+ assert_eq!(calls, vec![(2, 500)]);
+ }
+
+ #[test]
+ fn test_put_n_with_observer_zero_count_calls_observer() {
+ let mut enc = LevelEncoder::v2_streaming(3);
+ let mut calls: Vec<(i16, usize)> = Vec::new();
+ enc.put_n_with_observer(1, 0, |val, cnt| calls.push((val, cnt)));
+ assert_eq!(calls, vec![(1, 0)]);
+ }
+
+ #[test]
+ fn test_put_n_with_observer_followed_by_different_value() {
+ // Two consecutive put_n calls with different values — verifies that
+ // the encoder correctly transitions between runs.
+ let max_level = 3;
+ let mut enc = LevelEncoder::v2_streaming(max_level);
+ let mut calls: Vec<(i16, usize)> = Vec::new();
+ enc.put_n_with_observer(1, 100, |v, c| calls.push((v, c)));
+ enc.put_n_with_observer(3, 200, |v, c| calls.push((v, c)));
+ assert_eq!(calls, vec![(1, 100), (3, 200)]);
+
+ let reference = reference_encode(max_level, &[&[1i16; 100][..],
&[3i16; 200]].concat());
+ assert_eq!(enc.consume(), reference);
+ }
+
+ #[test]
+ fn test_put_n_with_observer_interleaved_with_put_with_observer() {
+ // Mix put_n_with_observer and put_with_observer to verify they
compose.
+ let max_level = 3;
+ let mut enc = LevelEncoder::v2_streaming(max_level);
+ enc.put_n_with_observer(2, 50, |_, _| {});
+ enc.put_with_observer(&[0, 0, 1, 1, 3], |_, _| {});
+ enc.put_n_with_observer(2, 50, |_, _| {});
+
+ let input = [&[2i16; 50][..], &[0, 0, 1, 1, 3], &[2i16; 50]].concat();
+ assert_eq!(enc.consume(), reference_encode(max_level, &input));
+ }
+
+ #[test]
+ fn test_put_n_with_observer_v1_roundtrip() {
+ // Also verify V1 (Rle variant with length header) works correctly.
+ let max_level = 3;
+ let mut enc = LevelEncoder::v1_streaming(max_level);
+ enc.put_n_with_observer(2, 1000, |_, _| {});
+
+ let mut ref_enc = LevelEncoder::v1_streaming(max_level);
+ ref_enc.put_with_observer(&[2; 1000], |_, _| {});
+ assert_eq!(enc.consume(), ref_enc.consume());
+ }
+}