zhang2014 opened a new issue, #9957:
URL: https://github.com/apache/arrow-rs/issues/9957
**Is your feature request related to a problem or challenge? Please describe
what you are trying to do.**
I want to write **a large record batch into a parquet file consisting of a
single (arbitrarily large) row group**. A typical use case is log data — a
single parquet file can reach several GB to tens of GB.
The ideal memory profile is:
- columns processed strictly sequentially, only one leaf column being
encoded at any time
- for each column, pages are flushed to the file sink as soon as they are
full
- peak memory is on the order of one encoder's in-progress page,
**independent of total row count and column count**
This is the natural shape of the problem: a single row group means no
cross-row-group buffering; sequential columns means no cross-column buffering;
page-level streaming means no cross-page buffering. For files in the several-GB
to tens-of-GB range, the absence of this streaming capability means writer peak
memory scales with the total row group size instead of converging on a single
page.
Neither of the existing public paths supports this:
**`ArrowWriter`** — `ArrowWriter::write(batch)` pushes data row-wise into
every column's encoder. `ArrowColumnWriter` internally uses `ArrowPageWriter`,
which buffers every compressed page into a `SharedColumnChunk` until `close()`
is called, then appends the whole column chunk to the file sink via
`ArrowColumnChunk::append_to_row_group`. Peak memory = Σ(compressed bytes of
every leaf column's chunk) + encoder state for every column. For a 10 GB target
file with a wide schema, even with 5× compression the combined compressed
column chunks can reach the 2 GB range. There is no knob to make
`ArrowPageWriter` flush pages as they are produced.
**`SerializedFileWriter` + `next_column()` +
`SerializedColumnWriter::typed::<T>()`** — this is the natural low-level path.
`SerializedPageWriter` (the default page writer under `next_column()`) already
flushes each compressed page directly to the file sink, which is exactly the
behavior I want. But two problems block this path:
***Problem 1: `SerializedColumnWriter` hardcodes the generic encoder.***
```rust
pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a,
ColumnValueEncoderImpl<T>>;
pub enum ColumnWriter<'a> {
ByteArrayColumnWriter(ColumnWriterImpl<'a, ByteArrayType>), ...
}
```
Going through `next_column()` locks in `ColumnValueEncoderImpl<T>`. To use
`ByteArrayEncoder` (the specialized zero-copy byte array encoder that
`ArrowColumnWriter` uses internally), I need to construct
`GenericColumnWriter<ByteArrayEncoder>` myself via a factory — but
`SerializedRowGroupWriter::next_column_with_factory` is `pub(crate)`:
```rust
pub(crate) fn next_column_with_factory<'b, F, C>(
&'b mut self,
factory: F,
) -> Result<Option<C>>
where
F: FnOnce(
ColumnDescPtr,
WriterPropertiesPtr,
Box<dyn PageWriter + 'b>,
OnCloseColumnChunk<'b>,
) -> Result<C>,
```
The signature is already fully generic in the return type `C`; the only
barrier is `pub(crate)`.
***Problem 2: `write_batch` requires a dense value array, discarding
null-handling information I already have.***
Arrow arrays naturally produce `(dense values, non-null indices)` pairs when
computing definition/repetition levels (see `ArrayLevels::non_null_indices()`).
The public API is:
```rust
pub fn write_batch(
&mut self,
values: &E::Values,
def_levels: Option<&[i16]>,
rep_levels: Option<&[i16]>,
) -> Result<usize>
```
There is no way for external callers to say "take values at these indices".
The internal `write_batch_internal` has exactly this entry point:
```rust
pub(crate) fn write_batch_internal(
&mut self,
values: &E::Values,
value_indices: Option<&[usize]>, // <-- this
def_levels: LevelDataRef<'_>,
rep_levels: LevelDataRef<'_>,
min: Option<&E::T>,
max: Option<&E::T>,
distinct_count: Option<u64>,
) -> Result<usize>
```
This is exactly the path `ArrowColumnWriter` uses internally via
`write_primitive` to avoid materializing a gathered copy. External callers can
only gather into a scratch `Vec<E::T>` and call `write_batch`, or go through
`ArrowColumnWriter` and accept the column-chunk buffering.
**Describe the solution you'd like**
Two visibility changes + one thin wrapper method. No implementation changes,
no behavior changes.
***1. `SerializedRowGroupWriter::next_column_with_factory` → `pub`***
```diff
impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
- pub(crate) fn next_column_with_factory<'b, F, C>(&'b mut self, factory:
F) -> Result<Option<C>>
+ pub fn next_column_with_factory<'b, F, C>(&'b mut self, factory: F) ->
Result<Option<C>>
}
```
This lets callers plug in their own `GenericColumnWriter<E>` (including
`GenericColumnWriter<ByteArrayEncoder>`) with the file's `SerializedPageWriter`
already correctly wired up by arrow-rs internally.
***2. New `pub fn write_batch_with_indices` on `GenericColumnWriter`***
```rust
impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
pub fn write_batch_with_indices(
&mut self,
values: &E::Values,
indices: &[usize],
def_levels: Option<&[i16]>,
rep_levels: Option<&[i16]>,
) -> Result<usize> {
self.write_batch_internal(
values, Some(indices),
LevelDataRef::from(def_levels),
LevelDataRef::from(rep_levels),
None, None, None,
)
}
pub fn write_batch_with_indices_and_statistics(
&mut self,
values: &E::Values,
indices: &[usize],
def_levels: Option<&[i16]>,
rep_levels: Option<&[i16]>,
min: Option<&E::T>,
max: Option<&E::T>,
distinct_count: Option<u64>,
) -> Result<usize> { ... }
}
```
Exposes the existing `value_indices` code path. `write_batch` already
delegates to `write_batch_internal` with `value_indices: None`; the new API
just exposes the `Some(indices)` variant as a public wrapper.
**Describe alternatives you've considered**
*Use `ArrowColumnWriter` as-is.* This is the right default for most callers,
but not when the file is designed to contain a single large row group (as in
log workloads reaching several GB to tens of GB per file), columns are
processed strictly sequentially, and peak memory is dominated by
per-column-chunk buffering rather than encoding cost. In this shape, the
natural memory ceiling is one encoder's in-progress page, not one column chunk.
*Pre-gather into a dense `Vec<T>` before `write_batch`.* Works for
fixed-width types, but wastes `K × sizeof(T)` of memory that the
`value_indices` path would save. For `ByteArrayType` it wastes `Vec<ByteArray>`
(~32 bytes per row).
*Reimplement parquet encoding downstream.* The Arrow → Parquet dispatch
(`write_leaf`), level computation, and byte array encoders are non-trivial and
evolve with the format spec. Not a viable long-term maintenance path.
**Additional context**
The `SerializedFileWriter` + `SerializedPageWriter` infrastructure already
supports page-streamed column writes end-to-end — what's missing is just the
plumbing to connect it with Arrow input and the existing specialized encoders.
Happy to submit a PR if this direction is acceptable.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]