alamb commented on code in PR #10020:
URL: https://github.com/apache/arrow-rs/pull/10020#discussion_r3343247741
##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -603,20 +628,67 @@ impl ArrowWriterOptions {
}
}
-/// A single column chunk produced by [`ArrowColumnWriter`]
-#[derive(Default)]
+/// A single column chunk produced by [`ArrowColumnWriter`].
+///
+/// Holds the serialized page blobs (each page's header ‖ compressed data, in
+/// write order) in a [`PageStore`], plus the handles needed to read them back,
+/// in order, when the chunk is spliced into the output file.
struct ArrowColumnChunkData {
+ length: usize,
+ store: Box<dyn PageStore>,
+ keys: Vec<PageKey>,
Review Comment:
in theory this is one extra Vec per column (which is fine I think)
##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -603,20 +628,67 @@ impl ArrowWriterOptions {
}
}
-/// A single column chunk produced by [`ArrowColumnWriter`]
-#[derive(Default)]
+/// A single column chunk produced by [`ArrowColumnWriter`].
+///
+/// Holds the serialized page blobs (each page's header ‖ compressed data, in
+/// write order) in a [`PageStore`], plus the handles needed to read them back,
+/// in order, when the chunk is spliced into the output file.
struct ArrowColumnChunkData {
+ length: usize,
+ store: Box<dyn PageStore>,
+ keys: Vec<PageKey>,
+}
+
+impl ArrowColumnChunkData {
+ fn new(store: Box<dyn PageStore>) -> Self {
+ Self {
+ length: 0,
+ store,
+ keys: Vec::new(),
+ }
+ }
+
+ /// Append a serialized blob to the store, recording its handle in write
+ /// order.
+ fn push(&mut self, value: Bytes) -> Result<()> {
+ let key = self.store.put(value)?;
+ self.keys.push(key);
+ Ok(())
+ }
+
+ /// Take every buffered blob back out of the store, in write order, ready
to
+ /// splice into the output file.
+ ///
+ /// TODO(spill): stream blobs one at a time into the splice instead of
Review Comment:
I think implementing this TODO will result in an API change (to
MaterializedColumnChunk) but since it is all internal that is probably fine
##########
parquet/src/column/page_store.rs:
##########
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Pluggable storage for completed, serialized page blobs.
+//!
+//! While a row group is being written the [`ArrowWriter`] must buffer every
+//! column's encoded pages, because Parquet requires each column chunk to be
+//! contiguous on disk while record batches arrive with all columns
interleaved.
Review Comment:
```suggestion
//! contiguous in the file while record batches arrive with all columns
interleaved.
```
##########
parquet/src/column/page_store.rs:
##########
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Pluggable storage for completed, serialized page blobs.
+//!
+//! While a row group is being written the [`ArrowWriter`] must buffer every
+//! column's encoded pages, because Parquet requires each column chunk to be
+//! contiguous on disk while record batches arrive with all columns
interleaved.
+//! By default that buffer lives on the heap, so the writer's peak memory grows
+//! with the row group size. A [`PageStore`] lets the buffer live somewhere
else
+//! — a local temp file, object storage, etc. — bounding peak write memory
+//! independently of the row group size.
+//!
+//! [`ArrowWriter`]: crate::arrow::arrow_writer::ArrowWriter
+
+use std::fmt::Debug;
+
+use bytes::Bytes;
+
+use crate::errors::{ParquetError, Result};
+
+/// An opaque, store-allocated handle to a blob held by a [`PageStore`].
+///
+/// Handles are allocated by the store — densely and sequentially — and are
only
+/// meaningful to the store that produced them. The caller treats them as
opaque
+/// tokens and decides what they *mean* (ordering, which one is the dictionary
+/// page, etc.).
+///
+/// Letting the store allocate the handle (rather than the caller choosing
keys)
+/// lets each backend pick the cheapest possible locator with no hashing: an
+/// in-memory backend uses the handle as an index into a `Vec`, a temp-file
+/// backend as an index into a `Vec<(offset, len)>`, and so on.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
+pub struct PageKey(pub(crate) u64);
+
+/// A pluggable store for completed, serialized page blobs.
+///
+/// The store is intentionally "dumb": it only maps an opaque [`PageKey`] to a
+/// blob of bytes. It knows nothing about pages, dictionaries, ordering, or
+/// offsets. The caller keeps the handles it gets back from [`put`](Self::put)
+/// and decides what they mean.
+///
+/// Each store instance is owned by a single column writer and mutated by one
+/// thread at a time (both methods take `&mut self`), so it needs no internal
+/// synchronization — hence only `Send`, not `Sync`.
+///
+/// The default ([`InMemoryPageStore`]) keeps blobs on the heap. Configure a
+/// different backend via
+///
[`ArrowWriterOptions::with_page_store_factory`](crate::arrow::arrow_writer::ArrowWriterOptions::with_page_store_factory).
+pub trait PageStore: Send {
+ /// Store `value`, returning a handle that can later be passed to
+ /// [`take`](Self::take).
+ fn put(&mut self, value: Bytes) -> Result<PageKey>;
+
+ /// Take back the blob previously stored under `key`.
+ ///
+ /// The caller takes ownership of the returned bytes and will **not**
request
+ /// `key` again, so the store may release any resources backing it —
eagerly
+ /// here, or when the store is dropped.
+ fn take(&mut self, key: PageKey) -> Result<Bytes>;
+}
+
+/// Creates a fresh [`PageStore`] for each column chunk.
+///
+/// See
+///
[`ArrowWriterOptions::with_page_store_factory`](crate::arrow::arrow_writer::ArrowWriterOptions::with_page_store_factory).
+pub trait PageStoreFactory: Send + Sync + Debug {
+ /// Create a new, empty [`PageStore`] for the leaf column at
`column_index`.
+ ///
+ /// `column_index` is a hint a backend may use to e.g. name spill files or
+ /// shard across a bounded pool; it carries no ordering or coordination
+ /// requirement.
+ fn create(&self, column_index: usize) -> Result<Box<dyn PageStore>>;
+}
+
+/// The default [`PageStore`], holding blobs on the heap in a `Vec<Bytes>`.
+///
+/// This is byte-for-byte equivalent to the writer's historical buffering
Review Comment:
I think the comment about historical writer behavior is good context fro the
PR but not helpful for future readers. I think we can just say "peak memory
grows with row group size".
##########
parquet/src/column/page_store.rs:
##########
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Pluggable storage for completed, serialized page blobs.
+//!
+//! While a row group is being written the [`ArrowWriter`] must buffer every
+//! column's encoded pages, because Parquet requires each column chunk to be
+//! contiguous on disk while record batches arrive with all columns
interleaved.
+//! By default that buffer lives on the heap, so the writer's peak memory grows
+//! with the row group size. A [`PageStore`] lets the buffer live somewhere
else
+//! — a local temp file, object storage, etc. — bounding peak write memory
+//! independently of the row group size.
+//!
+//! [`ArrowWriter`]: crate::arrow::arrow_writer::ArrowWriter
+
+use std::fmt::Debug;
+
+use bytes::Bytes;
+
+use crate::errors::{ParquetError, Result};
+
+/// An opaque, store-allocated handle to a blob held by a [`PageStore`].
+///
+/// Handles are allocated by the store — densely and sequentially — and are
only
+/// meaningful to the store that produced them. The caller treats them as
opaque
Review Comment:
This seems contradictory:
1. The caller treats them as opaque
2. Also the caller decides what they mean
🤔
I think it should just say "treats them as opaque"
##########
parquet/Cargo.toml:
##########
@@ -94,6 +94,7 @@ tokio = { version = "1.0", default-features = false, features
= ["macros", "rt-m
rand = { version = "0.9", default-features = false, features = ["std",
"std_rng", "thread_rng"] }
object_store = { workspace = true, features = ["azure", "fs"] }
sysinfo = { version = "0.38.1", default-features = false, features =
["system"] }
+dhat = { version = "0.3", default-features = false }
Review Comment:
Can we avoid a new dependency?
I realize it is a dev only, but it seems like a one man maintained crate
and is just one more thing to maintain / keep up to date:
https://crates.io/crates/dhat
##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -603,52 +627,108 @@ impl ArrowWriterOptions {
}
}
-/// A single column chunk produced by [`ArrowColumnWriter`]
-#[derive(Default)]
+/// A single column chunk produced by [`ArrowColumnWriter`].
+///
+/// Holds the serialized page blobs (each page's header ‖ compressed data, in
+/// write order) in a [`PageStore`], plus the handles needed to read them back,
+/// in order, when the chunk is spliced into the output file.
struct ArrowColumnChunkData {
length: usize,
- data: Vec<Bytes>,
+ store: Box<dyn PageStore>,
+ keys: Vec<PageKey>,
+ /// The dictionary page's serialized blobs (header ‖ data), held in memory
+ /// rather than the store.
+ ///
+ /// A dictionary page is produced at most once and bounded by
Review Comment:
Why not just keep a list of Vec<PageKey> for consistency? It seems strange
not to allow Dictionary pages to be spilled (especially for misconfigured
writes where the dictionary page could be large) 🤔
##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -603,52 +627,108 @@ impl ArrowWriterOptions {
}
}
-/// A single column chunk produced by [`ArrowColumnWriter`]
-#[derive(Default)]
+/// A single column chunk produced by [`ArrowColumnWriter`].
+///
+/// Holds the serialized page blobs (each page's header ‖ compressed data, in
+/// write order) in a [`PageStore`], plus the handles needed to read them back,
+/// in order, when the chunk is spliced into the output file.
struct ArrowColumnChunkData {
length: usize,
- data: Vec<Bytes>,
+ store: Box<dyn PageStore>,
+ keys: Vec<PageKey>,
+ /// The dictionary page's serialized blobs (header ‖ data), held in memory
+ /// rather than the store.
+ ///
+ /// A dictionary page is produced at most once and bounded by
+ /// `dict_page_size_limit`, but it must be written *first* in the chunk
even
+ /// though the data pages reach the writer before it (see
+ /// [`PageWriter::defers_dictionary_ordering`]). Spilling it would only
+ /// round-trip ~1 page to the backend and straight back, so it is kept here
+ /// and emitted ahead of the data pages at splice. Empty for non-dictionary
+ /// columns.
+ dictionary: Vec<Bytes>,
}
-impl Length for ArrowColumnChunkData {
- fn len(&self) -> u64 {
- self.length as _
+impl ArrowColumnChunkData {
+ fn new(store: Box<dyn PageStore>) -> Self {
+ Self {
+ length: 0,
+ store,
+ keys: Vec::new(),
+ dictionary: Vec::new(),
+ }
+ }
+
+ /// Append a data-page blob to the store, recording its handle in write
+ /// order.
+ fn push(&mut self, value: Bytes) -> Result<()> {
+ let key = self.store.put(value)?;
+ self.keys.push(key);
+ Ok(())
}
-}
-impl ChunkReader for ArrowColumnChunkData {
- type T = ArrowColumnChunkReader;
+ /// Retain a dictionary-page blob in memory (emitted first at splice).
+ fn push_dictionary(&mut self, value: Bytes) {
+ self.dictionary.push(value);
+ }
- fn get_read(&self, start: u64) -> Result<Self::T> {
- assert_eq!(start, 0); // Assume append_column writes all data in
one-shot
- Ok(ArrowColumnChunkReader(
- self.data.clone().into_iter().peekable(),
- ))
+ /// Total serialized size of the in-memory dictionary page, in bytes.
+ fn dictionary_len(&self) -> usize {
+ self.dictionary.iter().map(Bytes::len).sum()
}
- fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
- unimplemented!()
+ /// Bytes this chunk currently holds on the heap: whatever the store keeps
+ /// resident (zero for a spilling backend) plus the in-memory dictionary
+ /// page.
+ fn memory_size(&self) -> usize {
+ self.store.memory_size() + self.dictionary_len()
}
}
-/// A [`Read`] for [`ArrowColumnChunkData`]
-struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
+/// A streaming [`Read`] over one column chunk's buffered pages, in final file
+/// order: the in-memory dictionary page (if any) first, then the data pages.
+///
+/// Each data-page blob is taken back out of the [`PageStore`] *as it is
+/// consumed* and released immediately afterwards, so splicing a chunk into the
+/// output file never materializes more than a single page in memory at a time.
+/// This is what keeps the splice phase within the memory bound for a spilling
+/// backend (an in-memory store already holds the bytes, so it is unaffected).
+struct StreamingColumnChunkReader {
+ /// Dictionary-page blobs, emitted before any data page.
+ dictionary: IntoIter<Bytes>,
+ store: Box<dyn PageStore>,
+ keys: IntoIter<PageKey>,
+ /// The blob currently being drained into the output; emptied as it is
read.
+ current: Bytes,
+}
+
+impl StreamingColumnChunkReader {
+ fn new(data: ArrowColumnChunkData) -> Self {
+ Self {
+ dictionary: data.dictionary.into_iter(),
+ store: data.store,
+ keys: data.keys.into_iter(),
+ current: Bytes::new(),
+ }
+ }
+}
-impl Read for ArrowColumnChunkReader {
+impl Read for StreamingColumnChunkReader {
fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
Review Comment:
this is a blocking API FWIW (so it would be tough to connect this to an
object_store). It also basically requires an extra copy, which is non ideal
However, both properties are the same before and after this PR
##########
parquet/src/column/writer/mod.rs:
##########
@@ -1271,7 +1282,14 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a,
E> {
};
// Check if we need to buffer data page or flush it to the sink
directly.
- if self.encoder.has_dictionary() {
+ //
+ // For dictionary-encoded columns the dictionary page must be written
+ // first, but it is not final until all values are seen, so completed
+ // data pages are normally buffered here until `close`. A page writer
+ // that defers final layout (the Arrow path) instead orders pages
itself
+ // at flush, so we stream the data pages straight through and never let
+ // them accumulate in memory.
+ if self.encoder.has_dictionary() &&
!self.page_writer.defers_dictionary_ordering() {
self.data_pages.push_back(compressed_page);
Review Comment:
it is kind of strange there are multiple levels of buffering going on --
both in the GenericColumWriter::data_pages and (now) in the PageStore
I wonder if we can simplify the code and logic so that all the buffering is
deferred to the page store 🤔 Then the page store might not have to worry
about the different kind of pages at all
##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -726,12 +814,35 @@ impl PageWriter for ArrowPageWriter {
spec.bytes_written = compressed_size as u64;
buf.length += compressed_size;
- buf.data.push(header);
- buf.data.push(data);
+ if spec.page_type == PageType::DICTIONARY_PAGE {
+ // Held in memory and emitted first at splice — see
+ // `ArrowColumnChunkData::dictionary`. The buffer-relative offset
in
+ // `spec` (the dictionary arrives after the data pages on this
path)
+ // is rewritten to its true, dictionary-first position at splice.
Review Comment:
This comment is confusing to me (why is it talking about buffer-relative
offsets?) I think the code would be clearer if the comment was removed
##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -603,20 +628,67 @@ impl ArrowWriterOptions {
}
}
-/// A single column chunk produced by [`ArrowColumnWriter`]
-#[derive(Default)]
+/// A single column chunk produced by [`ArrowColumnWriter`].
+///
+/// Holds the serialized page blobs (each page's header ‖ compressed data, in
+/// write order) in a [`PageStore`], plus the handles needed to read them back,
+/// in order, when the chunk is spliced into the output file.
struct ArrowColumnChunkData {
+ length: usize,
+ store: Box<dyn PageStore>,
+ keys: Vec<PageKey>,
+}
+
+impl ArrowColumnChunkData {
+ fn new(store: Box<dyn PageStore>) -> Self {
+ Self {
+ length: 0,
+ store,
+ keys: Vec::new(),
+ }
+ }
+
+ /// Append a serialized blob to the store, recording its handle in write
+ /// order.
+ fn push(&mut self, value: Bytes) -> Result<()> {
+ let key = self.store.put(value)?;
+ self.keys.push(key);
+ Ok(())
+ }
+
+ /// Take every buffered blob back out of the store, in write order, ready
to
+ /// splice into the output file.
+ ///
+ /// TODO(spill): stream blobs one at a time into the splice instead of
Review Comment:
Crap -- I was reviewing commit by commit and left comments on an older
interface. Gaaa -- sorry -- will review just the latest
##########
parquet/src/column/page_store.rs:
##########
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Pluggable storage for completed, serialized page blobs.
+//!
+//! While a row group is being written the [`ArrowWriter`] must buffer every
+//! column's encoded pages, because Parquet requires each column chunk to be
+//! contiguous on disk while record batches arrive with all columns
interleaved.
+//! By default that buffer lives on the heap, so the writer's peak memory grows
+//! with the row group size. A [`PageStore`] lets the buffer live somewhere
else
+//! — a local temp file, object storage, etc. — bounding peak write memory
+//! independently of the row group size.
+//!
+//! [`ArrowWriter`]: crate::arrow::arrow_writer::ArrowWriter
+
+use std::fmt::Debug;
+
+use bytes::Bytes;
+
+use crate::errors::{ParquetError, Result};
+
+/// An opaque, store-allocated handle to a blob held by a [`PageStore`].
+///
+/// Handles are allocated by the store — densely and sequentially — and are
only
+/// meaningful to the store that produced them. The caller treats them as
opaque
+/// tokens and decides what they *mean* (ordering, which one is the dictionary
+/// page, etc.).
+///
+/// Letting the store allocate the handle (rather than the caller choosing
keys)
+/// lets each backend pick the cheapest possible locator with no hashing: an
+/// in-memory backend uses the handle as an index into a `Vec`, a temp-file
+/// backend as an index into a `Vec<(offset, len)>`, and so on.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
+pub struct PageKey(pub(crate) u64);
+
+/// A pluggable store for completed, serialized page blobs.
+///
+/// The store is intentionally "dumb": it only maps an opaque [`PageKey`] to a
+/// blob of bytes. It knows nothing about pages, dictionaries, ordering, or
+/// offsets. The caller keeps the handles it gets back from [`put`](Self::put)
+/// and decides what they mean.
+///
+/// Each store instance is owned by a single column writer and mutated by one
+/// thread at a time (both methods take `&mut self`), so it needs no internal
+/// synchronization — hence only `Send`, not `Sync`.
+///
+/// The default ([`InMemoryPageStore`]) keeps blobs on the heap. Configure a
+/// different backend via
+///
[`ArrowWriterOptions::with_page_store_factory`](crate::arrow::arrow_writer::ArrowWriterOptions::with_page_store_factory).
+pub trait PageStore: Send {
+ /// Store `value`, returning a handle that can later be passed to
+ /// [`take`](Self::take).
+ fn put(&mut self, value: Bytes) -> Result<PageKey>;
+
+ /// Take back the blob previously stored under `key`.
+ ///
+ /// The caller takes ownership of the returned bytes and will **not**
request
+ /// `key` again, so the store may release any resources backing it —
eagerly
+ /// here, or when the store is dropped.
+ fn take(&mut self, key: PageKey) -> Result<Bytes>;
+}
+
+/// Creates a fresh [`PageStore`] for each column chunk.
+///
+/// See
+///
[`ArrowWriterOptions::with_page_store_factory`](crate::arrow::arrow_writer::ArrowWriterOptions::with_page_store_factory).
+pub trait PageStoreFactory: Send + Sync + Debug {
+ /// Create a new, empty [`PageStore`] for the leaf column at
`column_index`.
+ ///
+ /// `column_index` is a hint a backend may use to e.g. name spill files or
+ /// shard across a bounded pool; it carries no ordering or coordination
+ /// requirement.
+ fn create(&self, column_index: usize) -> Result<Box<dyn PageStore>>;
+}
+
+/// The default [`PageStore`], holding blobs on the heap in a `Vec<Bytes>`.
+///
+/// This is byte-for-byte equivalent to the writer's historical buffering
Review Comment:
gah, also a comment on an earlier commit
##########
parquet/Cargo.toml:
##########
@@ -94,6 +94,7 @@ tokio = { version = "1.0", default-features = false, features
= ["macros", "rt-m
rand = { version = "0.9", default-features = false, features = ["std",
"std_rng", "thread_rng"] }
object_store = { workspace = true, features = ["azure", "fs"] }
sysinfo = { version = "0.38.1", default-features = false, features =
["system"] }
+dhat = { version = "0.3", default-features = false }
Review Comment:
The intro line of the Readme file also doesn't give me a lot of good
feelings: https://crates.io/crates/dhat
> Warning: This crate is experimental. It relies on implementation
techniques that are hard to keep working for 100% of configurations. It may
work fine for you, or it may crash, hang, or otherwise do the wrong thing. Its
maintenance is not a high priority of the author. Support requests such as
issues and pull requests may receive slow responses, or no response at all.
Sorry!
##########
parquet/tests/page_spill_memory.rs:
##########
Review Comment:
Can we please put this in an existing test file, rather than an entirely new
test binary (to save compilation speed)? I see it currently overrides the
global allocator 🤔
Maybe we can revisit if this test is really necessary (though I do see the
wisdom in incuding something like this)
##########
parquet/src/column/page_store.rs:
##########
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Pluggable storage for completed, serialized page blobs.
+//!
+//! While a row group is being written the [`ArrowWriter`] must buffer every
+//! column's encoded pages, because Parquet requires each column chunk to be
+//! contiguous on disk while record batches arrive with all columns
interleaved.
+//! By default that buffer lives on the heap, so the writer's peak memory grows
+//! with the row group size. A [`PageStore`] lets the buffer live somewhere
else
+//! — a local temp file, object storage, etc. — bounding peak write memory
+//! independently of the row group size.
+//!
+//! [`ArrowWriter`]: crate::arrow::arrow_writer::ArrowWriter
+
+use std::fmt::Debug;
+
+use bytes::Bytes;
+
+use crate::errors::{ParquetError, Result};
+
+/// An opaque, store-allocated handle to a blob held by a [`PageStore`].
+///
+/// Handles are allocated by the store — densely and sequentially — and are
only
+/// meaningful to the store that produced them. The caller treats them as
opaque
+/// tokens and decides what they *mean* (ordering, which one is the dictionary
+/// page, etc.).
+///
+/// Letting the store allocate the handle (rather than the caller choosing
keys)
Review Comment:
While this is true, I am not sure how much the added rationale really helps
understanding or using the API. Opaque ids are pretty common I think
##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -785,12 +896,39 @@ impl ArrowColumnChunk {
&mut self.close
}
- /// Calls [`SerializedRowGroupWriter::append_column`] with this column's
data
+ /// Splices this column's buffered pages into the row group, streaming them
+ /// back out of the [`PageStore`] one page at a time.
pub fn append_to_row_group<W: Write + Send>(
self,
writer: &mut SerializedRowGroupWriter<'_, W>,
) -> Result<()> {
- writer.append_column(&self.data, self.close)
+ let ArrowColumnChunk { data, mut close } = self;
+
+ // On the Arrow path the dictionary page is produced *after* the data
Review Comment:
I wonder if this woudl be easier to understand / discvoverable if it were a
method on ColumnCloseResult
so this code would look like
```rust
let close = close.update_dictionary_location(data.dictionary_len())
```
##########
parquet/src/column/writer/mod.rs:
##########
@@ -632,7 +632,18 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E>
{
/// of the current memory usage and not the final anticipated encoded size.
#[cfg(feature = "arrow")]
pub(crate) fn memory_size(&self) -> usize {
- self.column_metrics.total_bytes_written as usize +
self.encoder.estimated_memory_size()
+ // In-flight encoder buffers, plus any completed pages still held on
the
+ // heap: the dictionary-column data pages buffered here
(column-at-a-time
+ // path), plus whatever the page writer keeps resident. A page writer
+ // that spills completed pages off-heap reports far less than the bytes
+ // it was handed, so this tracks real memory rather than bytes written.
+ self.encoder.estimated_memory_size()
+ + self
+ .data_pages
+ .iter()
+ .map(|page| page.data().len())
Review Comment:
perhaps we could encapsulate this more like `page.memory_usage()` especially
since several of the Page variants have embedded Bytes
##########
parquet/src/column/page_store.rs:
##########
@@ -0,0 +1,226 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Pluggable storage for completed, serialized page blobs.
+//!
+//! While a row group is being written the [`ArrowWriter`] must buffer every
+//! column's encoded pages, because Parquet requires each column chunk to be
+//! contiguous on disk while record batches arrive with all columns
interleaved.
+//! By default that buffer lives on the heap, so the writer's peak memory grows
+//! with the row group size. A [`PageStore`] lets the buffer live somewhere
else
+//! — a local temp file, object storage, etc. — bounding peak write memory
+//! independently of the row group size.
+//!
+//! [`ArrowWriter`]: crate::arrow::arrow_writer::ArrowWriter
+
+use std::fmt::Debug;
+
+use bytes::Bytes;
+
+use crate::errors::{ParquetError, Result};
+
+/// An opaque, store-allocated handle to a blob held by a [`PageStore`].
+///
+/// Handles are allocated by the store — densely and sequentially — and are
only
+/// meaningful to the store that produced them. The caller treats them as
opaque
+/// tokens and decides what they *mean* (ordering, which one is the dictionary
+/// page, etc.).
+///
+/// Letting the store allocate the handle (rather than the caller choosing
keys)
+/// lets each backend pick the cheapest possible locator with no hashing: an
+/// in-memory backend uses the handle as an index into a `Vec`, a temp-file
+/// backend as an index into a `Vec<(offset, len)>`, and so on.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
+pub struct PageKey(u64);
+
+impl PageKey {
+ /// Create a handle wrapping `raw`.
+ ///
+ /// A [`PageStore`] implementation calls this to mint the handle it returns
+ /// from [`put`](PageStore::put). The value is opaque to the caller, so a
+ /// store is free to use a dense counter, a packed locator, or anything
else
+ /// it can later resolve in [`take`](PageStore::take).
+ pub const fn new(raw: u64) -> Self {
+ Self(raw)
+ }
+
+ /// The raw value passed to [`new`](Self::new).
+ pub const fn get(self) -> u64 {
+ self.0
+ }
+}
+
+/// A pluggable store for completed, serialized page blobs.
+///
+/// The store is intentionally "dumb": it only maps an opaque [`PageKey`] to a
+/// blob of bytes. It knows nothing about pages, dictionaries, ordering, or
+/// offsets. The caller keeps the handles it gets back from [`put`](Self::put)
+/// and decides what they mean.
+///
+/// Each store instance is owned by a single column writer and mutated by one
+/// thread at a time (both methods take `&mut self`), so it needs no internal
+/// synchronization — hence only `Send`, not `Sync`.
+///
+/// The default ([`InMemoryPageStore`]) keeps blobs on the heap. Configure a
+/// different backend via
+///
[`ArrowWriterOptions::with_page_store_factory`](crate::arrow::arrow_writer::ArrowWriterOptions::with_page_store_factory).
+pub trait PageStore: Send {
+ /// Store `value`, returning a handle that can later be passed to
+ /// [`take`](Self::take).
+ fn put(&mut self, value: Bytes) -> Result<PageKey>;
+
+ /// Take back the blob previously stored under `key`.
+ ///
+ /// The caller takes ownership of the returned bytes and will **not**
request
+ /// `key` again, so the store may release any resources backing it —
eagerly
+ /// here, or when the store is dropped.
+ fn take(&mut self, key: PageKey) -> Result<Bytes>;
+
+ /// The number of bytes this store currently holds **in memory** (resident
+ /// on the heap), used to report the writer's memory footprint.
+ ///
+ /// The default is `0`, which is exactly right for a backend that moves
+ /// every blob off-heap (a temp file, object storage): the bytes it has
been
+ /// handed no longer occupy heap. The in-memory backend overrides this to
+ /// report its resident blobs. A backend that keeps a partial in-memory
+ /// buffer should report that buffer's size.
+ fn memory_size(&self) -> usize {
+ 0
+ }
+}
+
+/// Creates a fresh [`PageStore`] for each column chunk.
+///
+/// See
+///
[`ArrowWriterOptions::with_page_store_factory`](crate::arrow::arrow_writer::ArrowWriterOptions::with_page_store_factory).
+pub trait PageStoreFactory: Send + Sync + Debug {
+ /// Create a new, empty [`PageStore`] for the leaf column at
`column_index`.
+ ///
+ /// `column_index` is a hint a backend may use to e.g. name spill files or
+ /// shard across a bounded pool; it carries no ordering or coordination
+ /// requirement.
+ fn create(&self, column_index: usize) -> Result<Box<dyn PageStore>>;
Review Comment:
I suspect some implementations would like additional information here (like
the Schema or target type) so they only buffer string columns specially, for
example
Perhaps we can make the API a bit more future proof with an args struct
```rust
struct PageStoreArgs<'a> {
column_index: usize
}
impl <'a> PageStoreArgs<'a> {
// return the column index
fn column_index(&self) -> usize { .. }
}
```
That way we can add additional information arguments in future minor releases
I also think it is ok not to do this now and we can refine the API in the
future given our actual uses
##########
parquet/src/column/page.rs:
##########
@@ -430,6 +430,40 @@ pub trait PageWriter: Send {
/// either data page or dictionary page.
fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec>;
+ /// Whether this writer resolves the final page layout itself (at flush)
+ /// rather than committing bytes to their final position as pages arrive.
+ ///
+ /// The dictionary page of a column chunk must be written *first*, but it
is
+ /// not finalized until every value has been seen. A writer that commits
+ /// bytes live (e.g. straight to a file) therefore relies on the column
+ /// writer buffering the dictionary-encoded data pages in memory until the
Review Comment:
see above -- it might make more sense to just remove the column writer
dictionary page buffering and simplify this API
##########
parquet/src/column/page_store.rs:
##########
@@ -0,0 +1,226 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Pluggable storage for completed, serialized page blobs.
+//!
+//! While a row group is being written the [`ArrowWriter`] must buffer every
+//! column's encoded pages, because Parquet requires each column chunk to be
+//! contiguous on disk while record batches arrive with all columns
interleaved.
+//! By default that buffer lives on the heap, so the writer's peak memory grows
+//! with the row group size. A [`PageStore`] lets the buffer live somewhere
else
+//! — a local temp file, object storage, etc. — bounding peak write memory
+//! independently of the row group size.
+//!
+//! [`ArrowWriter`]: crate::arrow::arrow_writer::ArrowWriter
+
+use std::fmt::Debug;
+
+use bytes::Bytes;
+
+use crate::errors::{ParquetError, Result};
+
+/// An opaque, store-allocated handle to a blob held by a [`PageStore`].
+///
+/// Handles are allocated by the store — densely and sequentially — and are
only
+/// meaningful to the store that produced them. The caller treats them as
opaque
+/// tokens and decides what they *mean* (ordering, which one is the dictionary
+/// page, etc.).
+///
+/// Letting the store allocate the handle (rather than the caller choosing
keys)
+/// lets each backend pick the cheapest possible locator with no hashing: an
+/// in-memory backend uses the handle as an index into a `Vec`, a temp-file
+/// backend as an index into a `Vec<(offset, len)>`, and so on.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
+pub struct PageKey(u64);
+
+impl PageKey {
+ /// Create a handle wrapping `raw`.
+ ///
+ /// A [`PageStore`] implementation calls this to mint the handle it returns
+ /// from [`put`](PageStore::put). The value is opaque to the caller, so a
+ /// store is free to use a dense counter, a packed locator, or anything
else
+ /// it can later resolve in [`take`](PageStore::take).
+ pub const fn new(raw: u64) -> Self {
+ Self(raw)
+ }
+
+ /// The raw value passed to [`new`](Self::new).
+ pub const fn get(self) -> u64 {
+ self.0
+ }
+}
+
+/// A pluggable store for completed, serialized page blobs.
+///
+/// The store is intentionally "dumb": it only maps an opaque [`PageKey`] to a
+/// blob of bytes. It knows nothing about pages, dictionaries, ordering, or
+/// offsets. The caller keeps the handles it gets back from [`put`](Self::put)
+/// and decides what they mean.
+///
+/// Each store instance is owned by a single column writer and mutated by one
+/// thread at a time (both methods take `&mut self`), so it needs no internal
+/// synchronization — hence only `Send`, not `Sync`.
+///
+/// The default ([`InMemoryPageStore`]) keeps blobs on the heap. Configure a
+/// different backend via
+///
[`ArrowWriterOptions::with_page_store_factory`](crate::arrow::arrow_writer::ArrowWriterOptions::with_page_store_factory).
+pub trait PageStore: Send {
+ /// Store `value`, returning a handle that can later be passed to
+ /// [`take`](Self::take).
+ fn put(&mut self, value: Bytes) -> Result<PageKey>;
+
+ /// Take back the blob previously stored under `key`.
Review Comment:
this implementation does not guard against API abuse (like trying to take
the same element twice) 🤔
Perhaps as a future enhancement we could use `Vec<Option<Bytes>>` or
something
Alternately given the page store API seems like it is going to always be
returning all the pages, should we have the API avoid keys and just pass back
an iterator of pages 🤔
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]