etseidl commented on code in PR #9653:
URL: https://github.com/apache/arrow-rs/pull/9653#discussion_r3030621645
##########
parquet/src/column/writer/mod.rs:
##########
@@ -676,16 +735,16 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a,
E> {
)
})?;
- let values_to_write = levels
- .iter()
- .map(|level| (*level == self.descr.max_def_level()) as usize)
- .sum();
+ let mut values_to_write = 0usize;
+ let max_def = self.descr.max_def_level();
+ self.def_levels_encoder
+ .put_with_observer(levels, |level, count| {
Review Comment:
❤️ When I added the histograms I wasn't happy with the redundancy here. Nice
fix!
##########
parquet/src/arrow/arrow_writer/levels.rs:
##########
@@ -645,6 +718,7 @@ impl LevelInfoBuilder {
);
}
None => {
+ info.non_null_indices.reserve(len);
let iter = std::iter::repeat_n(info.max_def_level,
len);
def_levels.extend(iter);
Review Comment:
Could this case (which I think is nullable but no nulls) also make use of
the uniform levels?
##########
parquet/src/arrow/arrow_writer/levels.rs:
##########
@@ -625,13 +690,21 @@ impl LevelInfoBuilder {
fn write_leaf(info: &mut ArrayLevels, range: Range<usize>) {
let len = range.end - range.start;
+ // Fast path: entire leaf array is null
+ if let Some(nulls) = &info.logical_nulls {
+ if info.def_levels.is_some() && nulls.null_count() == nulls.len() {
+ info.extend_uniform_null_levels(info.max_def_level - 1,
info.max_rep_level, len);
+ return;
+ }
+ }
+
match &mut info.def_levels {
Some(def_levels) => {
def_levels.reserve(len);
- info.non_null_indices.reserve(len);
Review Comment:
Why does this move inside the `match`? Both arms call this anyway.
##########
parquet/src/encodings/levels.rs:
##########
@@ -57,37 +57,89 @@ impl LevelEncoder {
///
/// Unlike [`v2`](Self::v2), this does not require knowing the number of
values
/// upfront, making it suitable for incremental encoding where levels are
fed in
- /// as they arrive via [`put`](Self::put).
+ /// as they arrive via [`put_with_observer`](Self::put_with_observer).
pub fn v2_streaming(max_level: i16) -> Self {
let bit_width = num_required_bits(max_level as u64);
LevelEncoder::RleV2(RleEncoder::new_from_buf(bit_width, Vec::new()))
}
- /// Put/encode levels vector into this level encoder.
- /// Returns number of encoded values that are less than or equal to length
of the
- /// input buffer.
+ /// Put/encode levels vector into this level encoder and calls
+ /// `observer(value, count)` for each run of identical values encountered
+ /// during encoding.
+ /// Returns number of encoded values that are less than or equal to length
+ /// of the input buffer.
///
/// This method does **not** flush the underlying encoder, so it can be
called
/// incrementally across multiple batches without forcing run boundaries.
/// The encoder is flushed automatically when [`consume`](Self::consume)
is called.
#[inline]
- pub fn put(&mut self, buffer: &[i16]) -> usize {
- let mut num_encoded = 0;
+ pub fn put_with_observer<F>(&mut self, buffer: &[i16], mut observer: F) ->
usize
+ where
+ F: FnMut(i16, usize),
+ {
match *self {
LevelEncoder::Rle(ref mut encoder) | LevelEncoder::RleV2(ref mut
encoder) => {
- for value in buffer {
- encoder.put(*value as u64);
- num_encoded += 1;
+ let mut remaining = buffer;
+ while let Some((&value, rest)) = remaining.split_first() {
+ encoder.put(value as u64);
+ // After put(), check if the encoder just entered RLE
+ // accumulation mode. If so, scan ahead for the rest of
+ // this run to batch the observer call and bulk-extend.
+ if encoder.is_accumulating(value as u64) {
+ let run_len = rest.iter().take_while(|&&v| v ==
value).count();
+ if run_len > 0 {
+ encoder.extend_run(run_len);
+ }
+ observer(value, 1 + run_len);
+ remaining = &rest[run_len..];
+ } else {
+ observer(value, 1);
+ remaining = rest;
+ }
+ }
+ buffer.len()
+ }
+ LevelEncoder::BitPacked(bit_width, ref mut encoder) => {
+ for &value in buffer {
+ encoder.put_value(value as u64, bit_width as usize);
+ observer(value, 1);
+ }
+ buffer.len()
+ }
+ }
+ }
+
+ /// Encode `count` repetitions of a single level value, calling
+ /// `observer(value, count)` exactly once.
+ ///
+ /// This is O(1) amortized for RLE-based encoders (after a small warmup).
+ #[inline]
+ #[allow(dead_code)] // Used by [cfg(feature = "arrow")]
Review Comment:
can this just be feature gated then?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]