etseidl commented on code in PR #9972:
URL: https://github.com/apache/arrow-rs/pull/9972#discussion_r3313403153
##########
parquet/src/arrow/arrow_writer/byte_array.rs:
##########
@@ -593,6 +679,141 @@ where
}
}
+/// Cumulative-scan fallback used for byte array types that don't expose
+/// a single contiguous offsets buffer — view arrays, dictionary arrays,
+/// fixed-size binary. Returns the largest `k` such that the first `k`
+/// values picked out by `indices` encode to at most `byte_budget` bytes
+/// (or `indices.len()` if they all fit, or `1` if a single value alone
+/// exceeds the budget).
+///
+/// Free function so it can be used with `downcast_op!`.
+fn count_within_budget_accessor<T>(values: T, indices: &[usize], byte_budget:
usize) -> usize
+where
+ T: ArrayAccessor + Copy,
+ T::Item: AsRef<[u8]>,
+{
+ let mut cum: usize = 0;
+ for (i, idx) in indices.iter().enumerate() {
+ let value_len = values.value(*idx).as_ref().len() +
std::mem::size_of::<u32>();
+ cum = cum.saturating_add(value_len);
+ if cum > byte_budget {
+ return i.max(1);
+ }
+ }
+ indices.len()
+}
+
+/// Upper bound on any single value's byte length in a view array.
+///
+/// An inline view stores at most 12 bytes; an
+/// out-of-line view's data is a contiguous slice of exactly one data
+/// buffer, so it cannot be longer than the largest data buffer. This is a
+/// loose bound (a value is usually far smaller than a whole buffer) but it
+/// is O(number of buffers) and always sound.
+fn max_view_value_len(buffers: &[Buffer]) -> usize {
+ /// Bytes that fit inline in a u128 view word (the rest is len + prefix).
+ const MAX_INLINE_VIEW_LEN: usize = 12;
+ buffers
+ .iter()
+ .map(|b| b.len())
+ .max()
+ .unwrap_or(0)
+ .max(MAX_INLINE_VIEW_LEN)
+}
+
+/// Two-stage budget count for view arrays (`Utf8View`, `BinaryView`).
+///
+/// 1. View arrays have no prefix-sum offsets buffer, so the exact O(1)
+/// span subtraction `count_within_budget_offsets` uses is unavailable.
+/// But a *conservative* O(1) bound is: every value is at most
+/// `max_value_len` bytes, so the whole chunk fits the budget when
+/// `n * (max_value_len + 4) <= byte_budget`. This skips the per-value
+/// walk for the common small-value case — what view arrays are built
+/// for, and exactly the case where there is nothing to bound.
+/// 2. Otherwise scan per-value lengths from the low 32 bits of each u128
+/// view word (no data-buffer dereference) and stop at the first value
+/// that pushes the cumulative sum past the budget.
+fn count_within_budget_views(
+ views: &[u128],
+ indices: &[usize],
+ byte_budget: usize,
+ max_value_len: usize,
+) -> usize {
+ // Stage 1: O(1) conservative upper bound.
+ let per_value = max_value_len + std::mem::size_of::<u32>();
+ if indices.len().saturating_mul(per_value) <= byte_budget {
+ return indices.len();
+ }
+ // Stage 2: exact per-value scan.
+ let mut cum: usize = 0;
+ for (i, idx) in indices.iter().enumerate() {
+ let len = (views[*idx] as u32) as usize;
+ cum = cum.saturating_add(len + std::mem::size_of::<u32>());
+ if cum > byte_budget {
+ return i.max(1);
+ }
+ }
+ indices.len()
+}
+
+/// Two-stage fast path for `GenericByteArray<O>`
+/// (Utf8/LargeUtf8/Binary/LargeBinary).
+///
+/// `indices` are assumed sorted ascending — they always are here, since
+/// they come from `non_null_indices`, which is built in array order.
+///
+/// 1. The span `offsets[last+1] - offsets[first]` is an O(1) upper
+/// bound on the chunk's payload: it covers every array position in
+/// `[first, last]`, a superset of `indices`. For a non-null chunk
+/// `indices` *is* that whole range; for a chunk drawn from a
+/// nullable column the skipped positions are nulls, whose offset
+/// delta is zero, so the span still equals the exact payload.
+/// Either way, if the upper bound fits the budget every value
+/// fits — return `indices.len()` with no per-value work. This
+/// covers the overwhelmingly common "small values" case for both
+/// non-null *and* nullable columns.
+/// 2. Otherwise the chunk is genuinely near the budget: walk per-index
+/// lengths from the offsets buffer directly (no slice/UTF-8
+/// construction) and stop at the first value that pushes the
+/// cumulative sum past the budget.
+fn count_within_budget_offsets<T: ByteArrayType>(
+ values: &GenericByteArray<T>,
+ indices: &[usize],
+ byte_budget: usize,
+) -> usize {
+ if indices.is_empty() {
+ return 0;
+ }
+ let n = indices.len();
+ let first = indices[0];
+ let last = indices[n - 1];
+ let offsets = values.value_offsets();
+ let prefix_overhead = std::mem::size_of::<u32>();
+
+ // Stage 1: O(1) span upper bound. Skips Stage 2 in the common case —
+ // including nullable columns, whose `indices` are sparse. The earlier
+ // `last - first + 1 == n` contiguity gate forced every nullable chunk
+ // onto the O(n) Stage 2 walk even though the span check is valid for
+ // any sorted index set.
+ if last >= first {
+ let payload = (offsets[last + 1] - offsets[first]).as_usize();
+ if payload + n * prefix_overhead <= byte_budget {
+ return n;
+ }
+ }
+
+ // Stage 2: scan per-index lengths from the offsets buffer.
+ let mut cum: usize = 0;
+ for (i, idx) in indices.iter().enumerate() {
+ let len = (offsets[idx + 1] - offsets[*idx]).as_usize() +
prefix_overhead;
+ cum = cum.saturating_add(len);
+ if cum > byte_budget {
+ return i.max(1);
Review Comment:
I did some testing on a file consisting of 128b strings. If I set max page
size to 64000, then I wind up with a file with a pattern of pages of size
968/540/540 values. This is because this line will return a size of 484 (floor
of 64000/132). So what happens is the first mini batch of 484 is just under the
64k threshold, so we add the next 484 from the batch to get 968. That leaves 56
rows left. The next iter appends the next 484 to the 56 to get 540, and then we
have 484+56 left in that batch of 1024, so we wind up writing that 540. And
then repeat. If we instead just return `i + 1` here, that eliminates the need
for the `.max(1)`, and also gives us a mini-batch size just over the requested
threshold. Now I see a pattern of 485/539 repeating.
I wonder if there's a way to smooth this some. We can't really change the
batch size being passed in, but given a batch, maybe we can add some kind of
heuristic here that can figure out first that multiple mini-batches are needed
then divides the batch size by the number of batches to smooth this out some.
Naively something like
```rust
if cum > byte_budget {
//return (i+1).max(1);
let num_batches = 1.max(n/(i+1));
return 1.max(n / num_batches)
}
```
This overshoots some by producing mini-batches of 512. Dividing by 3 would
undershoot, but then we need two mini-batches to fill a page so that winds up
overshooting even more.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]