adriangb commented on code in PR #10020: URL: https://github.com/apache/arrow-rs/pull/10020#discussion_r3350192308
########## parquet/src/column/page_store.rs: ########## @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Pluggable storage for completed, serialized page blobs. +//! +//! While a row group is being written the [`ArrowWriter`] must buffer every +//! column's encoded pages, because Parquet requires each column chunk to be +//! contiguous on disk while record batches arrive with all columns interleaved. +//! By default that buffer lives on the heap, so the writer's peak memory grows +//! with the row group size. A [`PageStore`] lets the buffer live somewhere else +//! — a local temp file, object storage, etc. — bounding peak write memory +//! independently of the row group size. +//! +//! [`ArrowWriter`]: crate::arrow::arrow_writer::ArrowWriter + +use std::fmt::Debug; + +use bytes::Bytes; + +use crate::errors::{ParquetError, Result}; + +/// An opaque, store-allocated handle to a blob held by a [`PageStore`]. +/// +/// Handles are allocated by the store — densely and sequentially — and are only +/// meaningful to the store that produced them. The caller treats them as opaque Review Comment: done in a64768fdd5 — now just says the caller treats them as opaque tokens. ########## parquet/src/column/page_store.rs: ########## @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Pluggable storage for completed, serialized page blobs. +//! +//! While a row group is being written the [`ArrowWriter`] must buffer every +//! column's encoded pages, because Parquet requires each column chunk to be +//! contiguous on disk while record batches arrive with all columns interleaved. +//! By default that buffer lives on the heap, so the writer's peak memory grows +//! with the row group size. A [`PageStore`] lets the buffer live somewhere else +//! — a local temp file, object storage, etc. — bounding peak write memory +//! independently of the row group size. +//! +//! [`ArrowWriter`]: crate::arrow::arrow_writer::ArrowWriter + +use std::fmt::Debug; + +use bytes::Bytes; + +use crate::errors::{ParquetError, Result}; + +/// An opaque, store-allocated handle to a blob held by a [`PageStore`]. +/// +/// Handles are allocated by the store — densely and sequentially — and are only +/// meaningful to the store that produced them. The caller treats them as opaque +/// tokens and decides what they *mean* (ordering, which one is the dictionary +/// page, etc.). +/// +/// Letting the store allocate the handle (rather than the caller choosing keys) Review Comment: done in a64768fdd5 — dropped the rationale paragraph. ########## parquet/src/arrow/arrow_writer/mod.rs: ########## @@ -726,12 +814,35 @@ impl PageWriter for ArrowPageWriter { spec.bytes_written = compressed_size as u64; buf.length += compressed_size; - buf.data.push(header); - buf.data.push(data); + if spec.page_type == PageType::DICTIONARY_PAGE { + // Held in memory and emitted first at splice — see + // `ArrowColumnChunkData::dictionary`. The buffer-relative offset in + // `spec` (the dictionary arrives after the data pages on this path) + // is rewritten to its true, dictionary-first position at splice. Review Comment: done in a64768fdd5 — removed the comment. ########## parquet/src/column/writer/mod.rs: ########## @@ -632,7 +632,18 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { /// of the current memory usage and not the final anticipated encoded size. #[cfg(feature = "arrow")] pub(crate) fn memory_size(&self) -> usize { - self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size() + // In-flight encoder buffers, plus any completed pages still held on the + // heap: the dictionary-column data pages buffered here (column-at-a-time + // path), plus whatever the page writer keeps resident. A page writer + // that spills completed pages off-heap reports far less than the bytes + // it was handed, so this tracks real memory rather than bytes written. + self.encoder.estimated_memory_size() + + self + .data_pages + .iter() + .map(|page| page.data().len()) Review Comment: done in a64768fdd5 — added `CompressedPage::memory_usage()` and use it here. ########## parquet/src/arrow/arrow_writer/mod.rs: ########## @@ -785,12 +896,39 @@ impl ArrowColumnChunk { &mut self.close } - /// Calls [`SerializedRowGroupWriter::append_column`] with this column's data + /// Splices this column's buffered pages into the row group, streaming them + /// back out of the [`PageStore`] one page at a time. pub fn append_to_row_group<W: Write + Send>( self, writer: &mut SerializedRowGroupWriter<'_, W>, ) -> Result<()> { - writer.append_column(&self.data, self.close) + let ArrowColumnChunk { data, mut close } = self; + + // On the Arrow path the dictionary page is produced *after* the data Review Comment: done in a64768fdd5 — moved into `ColumnCloseResult::update_dictionary_location()`, so the call site is now `close.update_dictionary_location(data.dictionary_len())?`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
