alamb commented on code in PR #10020:
URL: https://github.com/apache/arrow-rs/pull/10020#discussion_r3343247741


##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -603,20 +628,67 @@ impl ArrowWriterOptions {
     }
 }
 
-/// A single column chunk produced by [`ArrowColumnWriter`]
-#[derive(Default)]
+/// A single column chunk produced by [`ArrowColumnWriter`].
+///
+/// Holds the serialized page blobs (each page's header ‖ compressed data, in
+/// write order) in a [`PageStore`], plus the handles needed to read them back,
+/// in order, when the chunk is spliced into the output file.
 struct ArrowColumnChunkData {
+    length: usize,
+    store: Box<dyn PageStore>,
+    keys: Vec<PageKey>,

Review Comment:
   in theory this is one extra Vec per column (which is fine I think)



##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -603,20 +628,67 @@ impl ArrowWriterOptions {
     }
 }
 
-/// A single column chunk produced by [`ArrowColumnWriter`]
-#[derive(Default)]
+/// A single column chunk produced by [`ArrowColumnWriter`].
+///
+/// Holds the serialized page blobs (each page's header ‖ compressed data, in
+/// write order) in a [`PageStore`], plus the handles needed to read them back,
+/// in order, when the chunk is spliced into the output file.
 struct ArrowColumnChunkData {
+    length: usize,
+    store: Box<dyn PageStore>,
+    keys: Vec<PageKey>,
+}
+
+impl ArrowColumnChunkData {
+    fn new(store: Box<dyn PageStore>) -> Self {
+        Self {
+            length: 0,
+            store,
+            keys: Vec::new(),
+        }
+    }
+
+    /// Append a serialized blob to the store, recording its handle in write
+    /// order.
+    fn push(&mut self, value: Bytes) -> Result<()> {
+        let key = self.store.put(value)?;
+        self.keys.push(key);
+        Ok(())
+    }
+
+    /// Take every buffered blob back out of the store, in write order, ready 
to
+    /// splice into the output file.
+    ///
+    /// TODO(spill): stream blobs one at a time into the splice instead of

Review Comment:
   I think implementing this TODO will result in an API change (to 
MaterializedColumnChunk) but since it is all internal that is probably fine



##########
parquet/src/column/page_store.rs:
##########
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Pluggable storage for completed, serialized page blobs.
+//!
+//! While a row group is being written the [`ArrowWriter`] must buffer every
+//! column's encoded pages, because Parquet requires each column chunk to be
+//! contiguous on disk while record batches arrive with all columns 
interleaved.

Review Comment:
   ```suggestion
   //! contiguous in the file while record batches arrive with all columns 
interleaved.
   ```



##########
parquet/src/column/page_store.rs:
##########
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Pluggable storage for completed, serialized page blobs.
+//!
+//! While a row group is being written the [`ArrowWriter`] must buffer every
+//! column's encoded pages, because Parquet requires each column chunk to be
+//! contiguous on disk while record batches arrive with all columns 
interleaved.
+//! By default that buffer lives on the heap, so the writer's peak memory grows
+//! with the row group size. A [`PageStore`] lets the buffer live somewhere 
else
+//! — a local temp file, object storage, etc. — bounding peak write memory
+//! independently of the row group size.
+//!
+//! [`ArrowWriter`]: crate::arrow::arrow_writer::ArrowWriter
+
+use std::fmt::Debug;
+
+use bytes::Bytes;
+
+use crate::errors::{ParquetError, Result};
+
+/// An opaque, store-allocated handle to a blob held by a [`PageStore`].
+///
+/// Handles are allocated by the store — densely and sequentially — and are 
only
+/// meaningful to the store that produced them. The caller treats them as 
opaque
+/// tokens and decides what they *mean* (ordering, which one is the dictionary
+/// page, etc.).
+///
+/// Letting the store allocate the handle (rather than the caller choosing 
keys)
+/// lets each backend pick the cheapest possible locator with no hashing: an
+/// in-memory backend uses the handle as an index into a `Vec`, a temp-file
+/// backend as an index into a `Vec<(offset, len)>`, and so on.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
+pub struct PageKey(pub(crate) u64);
+
+/// A pluggable store for completed, serialized page blobs.
+///
+/// The store is intentionally "dumb": it only maps an opaque [`PageKey`] to a
+/// blob of bytes. It knows nothing about pages, dictionaries, ordering, or
+/// offsets. The caller keeps the handles it gets back from [`put`](Self::put)
+/// and decides what they mean.
+///
+/// Each store instance is owned by a single column writer and mutated by one
+/// thread at a time (both methods take `&mut self`), so it needs no internal
+/// synchronization — hence only `Send`, not `Sync`.
+///
+/// The default ([`InMemoryPageStore`]) keeps blobs on the heap. Configure a
+/// different backend via
+/// 
[`ArrowWriterOptions::with_page_store_factory`](crate::arrow::arrow_writer::ArrowWriterOptions::with_page_store_factory).
+pub trait PageStore: Send {
+    /// Store `value`, returning a handle that can later be passed to
+    /// [`take`](Self::take).
+    fn put(&mut self, value: Bytes) -> Result<PageKey>;
+
+    /// Take back the blob previously stored under `key`.
+    ///
+    /// The caller takes ownership of the returned bytes and will **not** 
request
+    /// `key` again, so the store may release any resources backing it — 
eagerly
+    /// here, or when the store is dropped.
+    fn take(&mut self, key: PageKey) -> Result<Bytes>;
+}
+
+/// Creates a fresh [`PageStore`] for each column chunk.
+///
+/// See
+/// 
[`ArrowWriterOptions::with_page_store_factory`](crate::arrow::arrow_writer::ArrowWriterOptions::with_page_store_factory).
+pub trait PageStoreFactory: Send + Sync + Debug {
+    /// Create a new, empty [`PageStore`] for the leaf column at 
`column_index`.
+    ///
+    /// `column_index` is a hint a backend may use to e.g. name spill files or
+    /// shard across a bounded pool; it carries no ordering or coordination
+    /// requirement.
+    fn create(&self, column_index: usize) -> Result<Box<dyn PageStore>>;
+}
+
+/// The default [`PageStore`], holding blobs on the heap in a `Vec<Bytes>`.
+///
+/// This is byte-for-byte equivalent to the writer's historical buffering

Review Comment:
   I think the comment about historical writer behavior is good context fro the 
PR but not helpful for future readers. I think we can just say "peak memory 
grows with row group size".



##########
parquet/src/column/page_store.rs:
##########
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Pluggable storage for completed, serialized page blobs.
+//!
+//! While a row group is being written the [`ArrowWriter`] must buffer every
+//! column's encoded pages, because Parquet requires each column chunk to be
+//! contiguous on disk while record batches arrive with all columns 
interleaved.
+//! By default that buffer lives on the heap, so the writer's peak memory grows
+//! with the row group size. A [`PageStore`] lets the buffer live somewhere 
else
+//! — a local temp file, object storage, etc. — bounding peak write memory
+//! independently of the row group size.
+//!
+//! [`ArrowWriter`]: crate::arrow::arrow_writer::ArrowWriter
+
+use std::fmt::Debug;
+
+use bytes::Bytes;
+
+use crate::errors::{ParquetError, Result};
+
+/// An opaque, store-allocated handle to a blob held by a [`PageStore`].
+///
+/// Handles are allocated by the store — densely and sequentially — and are 
only
+/// meaningful to the store that produced them. The caller treats them as 
opaque

Review Comment:
   This seems contradictory:
   1. The caller treats them as opaque
   2. Also the caller decides what they mean 
   
   🤔 
   
   I think it should just say "treats them as opaque" 



##########
parquet/Cargo.toml:
##########
@@ -94,6 +94,7 @@ tokio = { version = "1.0", default-features = false, features 
= ["macros", "rt-m
 rand = { version = "0.9", default-features = false, features = ["std", 
"std_rng", "thread_rng"] }
 object_store = { workspace = true, features = ["azure", "fs"] }
 sysinfo = { version = "0.38.1", default-features = false, features = 
["system"] }
+dhat = { version = "0.3", default-features = false }

Review Comment:
   Can we avoid a new dependency? 
   
   I realize it is a dev only, but  it seems like a one man maintained crate 
and is just one more thing to maintain / keep up to date: 
https://crates.io/crates/dhat



##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -603,52 +627,108 @@ impl ArrowWriterOptions {
     }
 }
 
-/// A single column chunk produced by [`ArrowColumnWriter`]
-#[derive(Default)]
+/// A single column chunk produced by [`ArrowColumnWriter`].
+///
+/// Holds the serialized page blobs (each page's header ‖ compressed data, in
+/// write order) in a [`PageStore`], plus the handles needed to read them back,
+/// in order, when the chunk is spliced into the output file.
 struct ArrowColumnChunkData {
     length: usize,
-    data: Vec<Bytes>,
+    store: Box<dyn PageStore>,
+    keys: Vec<PageKey>,
+    /// The dictionary page's serialized blobs (header ‖ data), held in memory
+    /// rather than the store.
+    ///
+    /// A dictionary page is produced at most once and bounded by

Review Comment:
   Why not just keep a list of Vec<PageKey> for consistency? It seems strange 
not to allow Dictionary pages to be spilled (especially for misconfigured 
writes where the dictionary page could be large) 🤔 



##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -603,52 +627,108 @@ impl ArrowWriterOptions {
     }
 }
 
-/// A single column chunk produced by [`ArrowColumnWriter`]
-#[derive(Default)]
+/// A single column chunk produced by [`ArrowColumnWriter`].
+///
+/// Holds the serialized page blobs (each page's header ‖ compressed data, in
+/// write order) in a [`PageStore`], plus the handles needed to read them back,
+/// in order, when the chunk is spliced into the output file.
 struct ArrowColumnChunkData {
     length: usize,
-    data: Vec<Bytes>,
+    store: Box<dyn PageStore>,
+    keys: Vec<PageKey>,
+    /// The dictionary page's serialized blobs (header ‖ data), held in memory
+    /// rather than the store.
+    ///
+    /// A dictionary page is produced at most once and bounded by
+    /// `dict_page_size_limit`, but it must be written *first* in the chunk 
even
+    /// though the data pages reach the writer before it (see
+    /// [`PageWriter::defers_dictionary_ordering`]). Spilling it would only
+    /// round-trip ~1 page to the backend and straight back, so it is kept here
+    /// and emitted ahead of the data pages at splice. Empty for non-dictionary
+    /// columns.
+    dictionary: Vec<Bytes>,
 }
 
-impl Length for ArrowColumnChunkData {
-    fn len(&self) -> u64 {
-        self.length as _
+impl ArrowColumnChunkData {
+    fn new(store: Box<dyn PageStore>) -> Self {
+        Self {
+            length: 0,
+            store,
+            keys: Vec::new(),
+            dictionary: Vec::new(),
+        }
+    }
+
+    /// Append a data-page blob to the store, recording its handle in write
+    /// order.
+    fn push(&mut self, value: Bytes) -> Result<()> {
+        let key = self.store.put(value)?;
+        self.keys.push(key);
+        Ok(())
     }
-}
 
-impl ChunkReader for ArrowColumnChunkData {
-    type T = ArrowColumnChunkReader;
+    /// Retain a dictionary-page blob in memory (emitted first at splice).
+    fn push_dictionary(&mut self, value: Bytes) {
+        self.dictionary.push(value);
+    }
 
-    fn get_read(&self, start: u64) -> Result<Self::T> {
-        assert_eq!(start, 0); // Assume append_column writes all data in 
one-shot
-        Ok(ArrowColumnChunkReader(
-            self.data.clone().into_iter().peekable(),
-        ))
+    /// Total serialized size of the in-memory dictionary page, in bytes.
+    fn dictionary_len(&self) -> usize {
+        self.dictionary.iter().map(Bytes::len).sum()
     }
 
-    fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
-        unimplemented!()
+    /// Bytes this chunk currently holds on the heap: whatever the store keeps
+    /// resident (zero for a spilling backend) plus the in-memory dictionary
+    /// page.
+    fn memory_size(&self) -> usize {
+        self.store.memory_size() + self.dictionary_len()
     }
 }
 
-/// A [`Read`] for [`ArrowColumnChunkData`]
-struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
+/// A streaming [`Read`] over one column chunk's buffered pages, in final file
+/// order: the in-memory dictionary page (if any) first, then the data pages.
+///
+/// Each data-page blob is taken back out of the [`PageStore`] *as it is
+/// consumed* and released immediately afterwards, so splicing a chunk into the
+/// output file never materializes more than a single page in memory at a time.
+/// This is what keeps the splice phase within the memory bound for a spilling
+/// backend (an in-memory store already holds the bytes, so it is unaffected).
+struct StreamingColumnChunkReader {
+    /// Dictionary-page blobs, emitted before any data page.
+    dictionary: IntoIter<Bytes>,
+    store: Box<dyn PageStore>,
+    keys: IntoIter<PageKey>,
+    /// The blob currently being drained into the output; emptied as it is 
read.
+    current: Bytes,
+}
+
+impl StreamingColumnChunkReader {
+    fn new(data: ArrowColumnChunkData) -> Self {
+        Self {
+            dictionary: data.dictionary.into_iter(),
+            store: data.store,
+            keys: data.keys.into_iter(),
+            current: Bytes::new(),
+        }
+    }
+}
 
-impl Read for ArrowColumnChunkReader {
+impl Read for StreamingColumnChunkReader {
     fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {

Review Comment:
   this is a blocking API FWIW (so it would be tough to connect this to an 
object_store). It also basically requires an extra copy, which is non ideal
   
   However, both properties are the same before and after this PR



##########
parquet/src/column/writer/mod.rs:
##########
@@ -1271,7 +1282,14 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, 
E> {
         };
 
         // Check if we need to buffer data page or flush it to the sink 
directly.
-        if self.encoder.has_dictionary() {
+        //
+        // For dictionary-encoded columns the dictionary page must be written
+        // first, but it is not final until all values are seen, so completed
+        // data pages are normally buffered here until `close`. A page writer
+        // that defers final layout (the Arrow path) instead orders pages 
itself
+        // at flush, so we stream the data pages straight through and never let
+        // them accumulate in memory.
+        if self.encoder.has_dictionary() && 
!self.page_writer.defers_dictionary_ordering() {
             self.data_pages.push_back(compressed_page);

Review Comment:
   it is kind of strange there are multiple levels of buffering going on -- 
both in the GenericColumWriter::data_pages and (now) in the PageStore
   
   I wonder if we can simplify the code and logic so that all the buffering is 
deferred to the page store  🤔  Then the page store might not have to worry 
about the different kind of pages at all



##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -726,12 +814,35 @@ impl PageWriter for ArrowPageWriter {
         spec.bytes_written = compressed_size as u64;
 
         buf.length += compressed_size;
-        buf.data.push(header);
-        buf.data.push(data);
+        if spec.page_type == PageType::DICTIONARY_PAGE {
+            // Held in memory and emitted first at splice — see
+            // `ArrowColumnChunkData::dictionary`. The buffer-relative offset 
in
+            // `spec` (the dictionary arrives after the data pages on this 
path)
+            // is rewritten to its true, dictionary-first position at splice.

Review Comment:
   This comment is confusing to me (why is it talking about buffer-relative 
offsets?) I think the code would be clearer if the comment was removed



##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -603,20 +628,67 @@ impl ArrowWriterOptions {
     }
 }
 
-/// A single column chunk produced by [`ArrowColumnWriter`]
-#[derive(Default)]
+/// A single column chunk produced by [`ArrowColumnWriter`].
+///
+/// Holds the serialized page blobs (each page's header ‖ compressed data, in
+/// write order) in a [`PageStore`], plus the handles needed to read them back,
+/// in order, when the chunk is spliced into the output file.
 struct ArrowColumnChunkData {
+    length: usize,
+    store: Box<dyn PageStore>,
+    keys: Vec<PageKey>,
+}
+
+impl ArrowColumnChunkData {
+    fn new(store: Box<dyn PageStore>) -> Self {
+        Self {
+            length: 0,
+            store,
+            keys: Vec::new(),
+        }
+    }
+
+    /// Append a serialized blob to the store, recording its handle in write
+    /// order.
+    fn push(&mut self, value: Bytes) -> Result<()> {
+        let key = self.store.put(value)?;
+        self.keys.push(key);
+        Ok(())
+    }
+
+    /// Take every buffered blob back out of the store, in write order, ready 
to
+    /// splice into the output file.
+    ///
+    /// TODO(spill): stream blobs one at a time into the splice instead of

Review Comment:
   Crap -- I was reviewing commit by commit and left comments on an older 
interface. Gaaa -- sorry -- will review just the latest



##########
parquet/src/column/page_store.rs:
##########
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Pluggable storage for completed, serialized page blobs.
+//!
+//! While a row group is being written the [`ArrowWriter`] must buffer every
+//! column's encoded pages, because Parquet requires each column chunk to be
+//! contiguous on disk while record batches arrive with all columns 
interleaved.
+//! By default that buffer lives on the heap, so the writer's peak memory grows
+//! with the row group size. A [`PageStore`] lets the buffer live somewhere 
else
+//! — a local temp file, object storage, etc. — bounding peak write memory
+//! independently of the row group size.
+//!
+//! [`ArrowWriter`]: crate::arrow::arrow_writer::ArrowWriter
+
+use std::fmt::Debug;
+
+use bytes::Bytes;
+
+use crate::errors::{ParquetError, Result};
+
+/// An opaque, store-allocated handle to a blob held by a [`PageStore`].
+///
+/// Handles are allocated by the store — densely and sequentially — and are 
only
+/// meaningful to the store that produced them. The caller treats them as 
opaque
+/// tokens and decides what they *mean* (ordering, which one is the dictionary
+/// page, etc.).
+///
+/// Letting the store allocate the handle (rather than the caller choosing 
keys)
+/// lets each backend pick the cheapest possible locator with no hashing: an
+/// in-memory backend uses the handle as an index into a `Vec`, a temp-file
+/// backend as an index into a `Vec<(offset, len)>`, and so on.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
+pub struct PageKey(pub(crate) u64);
+
+/// A pluggable store for completed, serialized page blobs.
+///
+/// The store is intentionally "dumb": it only maps an opaque [`PageKey`] to a
+/// blob of bytes. It knows nothing about pages, dictionaries, ordering, or
+/// offsets. The caller keeps the handles it gets back from [`put`](Self::put)
+/// and decides what they mean.
+///
+/// Each store instance is owned by a single column writer and mutated by one
+/// thread at a time (both methods take `&mut self`), so it needs no internal
+/// synchronization — hence only `Send`, not `Sync`.
+///
+/// The default ([`InMemoryPageStore`]) keeps blobs on the heap. Configure a
+/// different backend via
+/// 
[`ArrowWriterOptions::with_page_store_factory`](crate::arrow::arrow_writer::ArrowWriterOptions::with_page_store_factory).
+pub trait PageStore: Send {
+    /// Store `value`, returning a handle that can later be passed to
+    /// [`take`](Self::take).
+    fn put(&mut self, value: Bytes) -> Result<PageKey>;
+
+    /// Take back the blob previously stored under `key`.
+    ///
+    /// The caller takes ownership of the returned bytes and will **not** 
request
+    /// `key` again, so the store may release any resources backing it — 
eagerly
+    /// here, or when the store is dropped.
+    fn take(&mut self, key: PageKey) -> Result<Bytes>;
+}
+
+/// Creates a fresh [`PageStore`] for each column chunk.
+///
+/// See
+/// 
[`ArrowWriterOptions::with_page_store_factory`](crate::arrow::arrow_writer::ArrowWriterOptions::with_page_store_factory).
+pub trait PageStoreFactory: Send + Sync + Debug {
+    /// Create a new, empty [`PageStore`] for the leaf column at 
`column_index`.
+    ///
+    /// `column_index` is a hint a backend may use to e.g. name spill files or
+    /// shard across a bounded pool; it carries no ordering or coordination
+    /// requirement.
+    fn create(&self, column_index: usize) -> Result<Box<dyn PageStore>>;
+}
+
+/// The default [`PageStore`], holding blobs on the heap in a `Vec<Bytes>`.
+///
+/// This is byte-for-byte equivalent to the writer's historical buffering

Review Comment:
   gah, also a comment on an earlier commit



##########
parquet/Cargo.toml:
##########
@@ -94,6 +94,7 @@ tokio = { version = "1.0", default-features = false, features 
= ["macros", "rt-m
 rand = { version = "0.9", default-features = false, features = ["std", 
"std_rng", "thread_rng"] }
 object_store = { workspace = true, features = ["azure", "fs"] }
 sysinfo = { version = "0.38.1", default-features = false, features = 
["system"] }
+dhat = { version = "0.3", default-features = false }

Review Comment:
   The intro line of the Readme file also doesn't give me a lot of good 
feelings: https://crates.io/crates/dhat
   
   > Warning: This crate is experimental. It relies on implementation 
techniques that are hard to keep working for 100% of configurations. It may 
work fine for you, or it may crash, hang, or otherwise do the wrong thing. Its 
maintenance is not a high priority of the author. Support requests such as 
issues and pull requests may receive slow responses, or no response at all. 
Sorry!



##########
parquet/tests/page_spill_memory.rs:
##########


Review Comment:
   Can we please put this in an existing test file, rather than an entirely new 
test binary (to save compilation speed)? I see it currently overrides the 
global allocator 🤔 
   
   Maybe we can revisit if this test is really necessary (though I do see the 
wisdom in incuding something like this)



##########
parquet/src/column/page_store.rs:
##########
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Pluggable storage for completed, serialized page blobs.
+//!
+//! While a row group is being written the [`ArrowWriter`] must buffer every
+//! column's encoded pages, because Parquet requires each column chunk to be
+//! contiguous on disk while record batches arrive with all columns 
interleaved.
+//! By default that buffer lives on the heap, so the writer's peak memory grows
+//! with the row group size. A [`PageStore`] lets the buffer live somewhere 
else
+//! — a local temp file, object storage, etc. — bounding peak write memory
+//! independently of the row group size.
+//!
+//! [`ArrowWriter`]: crate::arrow::arrow_writer::ArrowWriter
+
+use std::fmt::Debug;
+
+use bytes::Bytes;
+
+use crate::errors::{ParquetError, Result};
+
+/// An opaque, store-allocated handle to a blob held by a [`PageStore`].
+///
+/// Handles are allocated by the store — densely and sequentially — and are 
only
+/// meaningful to the store that produced them. The caller treats them as 
opaque
+/// tokens and decides what they *mean* (ordering, which one is the dictionary
+/// page, etc.).
+///
+/// Letting the store allocate the handle (rather than the caller choosing 
keys)

Review Comment:
   While this is true, I am not sure how much the added rationale really helps 
understanding or using the API. Opaque ids are pretty common I think



##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -785,12 +896,39 @@ impl ArrowColumnChunk {
         &mut self.close
     }
 
-    /// Calls [`SerializedRowGroupWriter::append_column`] with this column's 
data
+    /// Splices this column's buffered pages into the row group, streaming them
+    /// back out of the [`PageStore`] one page at a time.
     pub fn append_to_row_group<W: Write + Send>(
         self,
         writer: &mut SerializedRowGroupWriter<'_, W>,
     ) -> Result<()> {
-        writer.append_column(&self.data, self.close)
+        let ArrowColumnChunk { data, mut close } = self;
+
+        // On the Arrow path the dictionary page is produced *after* the data

Review Comment:
   I wonder if this woudl be easier to understand / discvoverable if it were a 
method on ColumnCloseResult 
   
   so this code would look like
   ```rust
   let close = close.update_dictionary_location(data.dictionary_len())
   ```
   
   



##########
parquet/src/column/writer/mod.rs:
##########
@@ -632,7 +632,18 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> 
{
     /// of the current memory usage and not the final anticipated encoded size.
     #[cfg(feature = "arrow")]
     pub(crate) fn memory_size(&self) -> usize {
-        self.column_metrics.total_bytes_written as usize + 
self.encoder.estimated_memory_size()
+        // In-flight encoder buffers, plus any completed pages still held on 
the
+        // heap: the dictionary-column data pages buffered here 
(column-at-a-time
+        // path), plus whatever the page writer keeps resident. A page writer
+        // that spills completed pages off-heap reports far less than the bytes
+        // it was handed, so this tracks real memory rather than bytes written.
+        self.encoder.estimated_memory_size()
+            + self
+                .data_pages
+                .iter()
+                .map(|page| page.data().len())

Review Comment:
   perhaps we could encapsulate this more like `page.memory_usage()` especially 
since several of the Page variants have embedded Bytes



##########
parquet/src/column/page_store.rs:
##########
@@ -0,0 +1,226 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Pluggable storage for completed, serialized page blobs.
+//!
+//! While a row group is being written the [`ArrowWriter`] must buffer every
+//! column's encoded pages, because Parquet requires each column chunk to be
+//! contiguous on disk while record batches arrive with all columns 
interleaved.
+//! By default that buffer lives on the heap, so the writer's peak memory grows
+//! with the row group size. A [`PageStore`] lets the buffer live somewhere 
else
+//! — a local temp file, object storage, etc. — bounding peak write memory
+//! independently of the row group size.
+//!
+//! [`ArrowWriter`]: crate::arrow::arrow_writer::ArrowWriter
+
+use std::fmt::Debug;
+
+use bytes::Bytes;
+
+use crate::errors::{ParquetError, Result};
+
+/// An opaque, store-allocated handle to a blob held by a [`PageStore`].
+///
+/// Handles are allocated by the store — densely and sequentially — and are 
only
+/// meaningful to the store that produced them. The caller treats them as 
opaque
+/// tokens and decides what they *mean* (ordering, which one is the dictionary
+/// page, etc.).
+///
+/// Letting the store allocate the handle (rather than the caller choosing 
keys)
+/// lets each backend pick the cheapest possible locator with no hashing: an
+/// in-memory backend uses the handle as an index into a `Vec`, a temp-file
+/// backend as an index into a `Vec<(offset, len)>`, and so on.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
+pub struct PageKey(u64);
+
+impl PageKey {
+    /// Create a handle wrapping `raw`.
+    ///
+    /// A [`PageStore`] implementation calls this to mint the handle it returns
+    /// from [`put`](PageStore::put). The value is opaque to the caller, so a
+    /// store is free to use a dense counter, a packed locator, or anything 
else
+    /// it can later resolve in [`take`](PageStore::take).
+    pub const fn new(raw: u64) -> Self {
+        Self(raw)
+    }
+
+    /// The raw value passed to [`new`](Self::new).
+    pub const fn get(self) -> u64 {
+        self.0
+    }
+}
+
+/// A pluggable store for completed, serialized page blobs.
+///
+/// The store is intentionally "dumb": it only maps an opaque [`PageKey`] to a
+/// blob of bytes. It knows nothing about pages, dictionaries, ordering, or
+/// offsets. The caller keeps the handles it gets back from [`put`](Self::put)
+/// and decides what they mean.
+///
+/// Each store instance is owned by a single column writer and mutated by one
+/// thread at a time (both methods take `&mut self`), so it needs no internal
+/// synchronization — hence only `Send`, not `Sync`.
+///
+/// The default ([`InMemoryPageStore`]) keeps blobs on the heap. Configure a
+/// different backend via
+/// 
[`ArrowWriterOptions::with_page_store_factory`](crate::arrow::arrow_writer::ArrowWriterOptions::with_page_store_factory).
+pub trait PageStore: Send {
+    /// Store `value`, returning a handle that can later be passed to
+    /// [`take`](Self::take).
+    fn put(&mut self, value: Bytes) -> Result<PageKey>;
+
+    /// Take back the blob previously stored under `key`.
+    ///
+    /// The caller takes ownership of the returned bytes and will **not** 
request
+    /// `key` again, so the store may release any resources backing it — 
eagerly
+    /// here, or when the store is dropped.
+    fn take(&mut self, key: PageKey) -> Result<Bytes>;
+
+    /// The number of bytes this store currently holds **in memory** (resident
+    /// on the heap), used to report the writer's memory footprint.
+    ///
+    /// The default is `0`, which is exactly right for a backend that moves
+    /// every blob off-heap (a temp file, object storage): the bytes it has 
been
+    /// handed no longer occupy heap. The in-memory backend overrides this to
+    /// report its resident blobs. A backend that keeps a partial in-memory
+    /// buffer should report that buffer's size.
+    fn memory_size(&self) -> usize {
+        0
+    }
+}
+
+/// Creates a fresh [`PageStore`] for each column chunk.
+///
+/// See
+/// 
[`ArrowWriterOptions::with_page_store_factory`](crate::arrow::arrow_writer::ArrowWriterOptions::with_page_store_factory).
+pub trait PageStoreFactory: Send + Sync + Debug {
+    /// Create a new, empty [`PageStore`] for the leaf column at 
`column_index`.
+    ///
+    /// `column_index` is a hint a backend may use to e.g. name spill files or
+    /// shard across a bounded pool; it carries no ordering or coordination
+    /// requirement.
+    fn create(&self, column_index: usize) -> Result<Box<dyn PageStore>>;

Review Comment:
   I suspect some implementations would like additional information here (like 
the Schema or target type) so they only buffer string columns specially, for 
example
   
   Perhaps we can make the API a bit more future proof with an args struct
   
   ```rust
   struct PageStoreArgs<'a> {
     column_index: usize
   }
   
   impl <'a> PageStoreArgs<'a>  {
     // return the column index
     fn column_index(&self) -> usize { .. } 
   }
   ```
   
   That way we can add additional information arguments in future minor releases
   
   I also think it is ok not to do this now and we can refine the API in the 
future given our actual uses
   
   



##########
parquet/src/column/page.rs:
##########
@@ -430,6 +430,40 @@ pub trait PageWriter: Send {
     /// either data page or dictionary page.
     fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec>;
 
+    /// Whether this writer resolves the final page layout itself (at flush)
+    /// rather than committing bytes to their final position as pages arrive.
+    ///
+    /// The dictionary page of a column chunk must be written *first*, but it 
is
+    /// not finalized until every value has been seen. A writer that commits
+    /// bytes live (e.g. straight to a file) therefore relies on the column
+    /// writer buffering the dictionary-encoded data pages in memory until the

Review Comment:
   see above -- it might make more sense to just remove the column writer 
dictionary page buffering and simplify this API



##########
parquet/src/column/page_store.rs:
##########
@@ -0,0 +1,226 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Pluggable storage for completed, serialized page blobs.
+//!
+//! While a row group is being written the [`ArrowWriter`] must buffer every
+//! column's encoded pages, because Parquet requires each column chunk to be
+//! contiguous on disk while record batches arrive with all columns 
interleaved.
+//! By default that buffer lives on the heap, so the writer's peak memory grows
+//! with the row group size. A [`PageStore`] lets the buffer live somewhere 
else
+//! — a local temp file, object storage, etc. — bounding peak write memory
+//! independently of the row group size.
+//!
+//! [`ArrowWriter`]: crate::arrow::arrow_writer::ArrowWriter
+
+use std::fmt::Debug;
+
+use bytes::Bytes;
+
+use crate::errors::{ParquetError, Result};
+
+/// An opaque, store-allocated handle to a blob held by a [`PageStore`].
+///
+/// Handles are allocated by the store — densely and sequentially — and are 
only
+/// meaningful to the store that produced them. The caller treats them as 
opaque
+/// tokens and decides what they *mean* (ordering, which one is the dictionary
+/// page, etc.).
+///
+/// Letting the store allocate the handle (rather than the caller choosing 
keys)
+/// lets each backend pick the cheapest possible locator with no hashing: an
+/// in-memory backend uses the handle as an index into a `Vec`, a temp-file
+/// backend as an index into a `Vec<(offset, len)>`, and so on.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
+pub struct PageKey(u64);
+
+impl PageKey {
+    /// Create a handle wrapping `raw`.
+    ///
+    /// A [`PageStore`] implementation calls this to mint the handle it returns
+    /// from [`put`](PageStore::put). The value is opaque to the caller, so a
+    /// store is free to use a dense counter, a packed locator, or anything 
else
+    /// it can later resolve in [`take`](PageStore::take).
+    pub const fn new(raw: u64) -> Self {
+        Self(raw)
+    }
+
+    /// The raw value passed to [`new`](Self::new).
+    pub const fn get(self) -> u64 {
+        self.0
+    }
+}
+
+/// A pluggable store for completed, serialized page blobs.
+///
+/// The store is intentionally "dumb": it only maps an opaque [`PageKey`] to a
+/// blob of bytes. It knows nothing about pages, dictionaries, ordering, or
+/// offsets. The caller keeps the handles it gets back from [`put`](Self::put)
+/// and decides what they mean.
+///
+/// Each store instance is owned by a single column writer and mutated by one
+/// thread at a time (both methods take `&mut self`), so it needs no internal
+/// synchronization — hence only `Send`, not `Sync`.
+///
+/// The default ([`InMemoryPageStore`]) keeps blobs on the heap. Configure a
+/// different backend via
+/// 
[`ArrowWriterOptions::with_page_store_factory`](crate::arrow::arrow_writer::ArrowWriterOptions::with_page_store_factory).
+pub trait PageStore: Send {
+    /// Store `value`, returning a handle that can later be passed to
+    /// [`take`](Self::take).
+    fn put(&mut self, value: Bytes) -> Result<PageKey>;
+
+    /// Take back the blob previously stored under `key`.

Review Comment:
   this implementation does not guard against API abuse (like trying to take 
the same element twice) 🤔 
   Perhaps as a future enhancement we could use `Vec<Option<Bytes>>` or 
something 
   
   Alternately given the page store API seems like it is going to always be 
returning all the pages, should we have the API avoid keys and just pass back 
an iterator of pages 🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to