adriangb commented on code in PR #9968:
URL: https://github.com/apache/arrow-rs/pull/9968#discussion_r3251165534


##########
parquet/src/arrow/push_decoder/mod.rs:
##########
@@ -156,15 +254,29 @@ impl ParquetPushDecoderBuilder {
     /// See [`ArrowReaderMetadata::try_new`] for how to create the metadata 
from
     /// the Parquet metadata and reader options.
     pub fn new_with_metadata(arrow_reader_metadata: ArrowReaderMetadata) -> 
Self {
-        Self::new_builder(NoInput, arrow_reader_metadata)
+        Self::new_builder(NoInput::default(), arrow_reader_metadata)
+    }
+
+    /// Reuse a [`PushBuffers`] when [`build`](Self::build)ing the decoder so
+    /// that bytes already fetched are not requested again.
+    ///
+    /// This is how [`ParquetPushDecoder::into_builder`] carries a decoder's

Review Comment:
   I assume you mean exposing `PushBuffers` in the public API? I've done that 
but just wanted to confirm.



##########
parquet/src/arrow/push_decoder/mod.rs:
##########
@@ -109,19 +109,117 @@ use std::sync::Arc;
 ///         }
 ///     }
 /// ```
+///
+/// # Adaptive scans
+///
+/// The scan strategy is not fixed once [`build`](Self::build) is called: it
+/// can be changed *while decoding*, at row-group boundaries.
+///
+/// The important API for this is [`ParquetPushDecoder::try_next_reader`].
+/// Unlike [`try_decode`](ParquetPushDecoder::try_decode), which barrels
+/// straight through row-group boundaries, `try_next_reader` returns once per
+/// row group — leaving a clean window *between* row groups. At any such
+/// boundary, [`ParquetPushDecoder::into_builder`] hands back a
+/// `ParquetPushDecoderBuilder` for the row groups not yet decoded. Change any
+/// option on it (projection, row filter, row selection policy, …) and
+/// [`build`](Self::build) a fresh decoder that resumes from the next row
+/// group. This is how a query engine promotes or demotes filters — for
+/// example turning a row filter on or off — based on the selectivity observed
+/// in the row groups decoded so far.
+///
+/// ```
+/// # use std::ops::Range;
+/// # use std::sync::Arc;
+/// # use bytes::Bytes;
+/// # use arrow_array::record_batch;
+/// # use parquet::DecodeResult;
+/// # use parquet::arrow::ProjectionMask;
+/// # use parquet::arrow::push_decoder::ParquetPushDecoderBuilder;
+/// # use parquet::arrow::ArrowWriter;
+/// # use parquet::file::metadata::ParquetMetaDataPushDecoder;
+/// # use parquet::file::properties::WriterProperties;
+/// # let file_bytes = {
+/// #   let batch = record_batch!(
+/// #       ("a", Int32, [1, 2, 3, 4, 5, 6]),
+/// #       ("b", Int32, [6, 5, 4, 3, 2, 1])
+/// #   ).unwrap();
+/// #   // Small row groups so the test file has two of them.
+/// #   let props = 
WriterProperties::builder().set_max_row_group_row_count(Some(3)).build();
+/// #   let mut buffer = vec![];
+/// #   let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), 
Some(props)).unwrap();
+/// #   writer.write(&batch).unwrap();
+/// #   writer.close().unwrap();
+/// #   Bytes::from(buffer)
+/// # };
+/// # let get_range = |r: &Range<u64>| file_bytes.slice(r.start as 
usize..r.end as usize);
+/// # let file_length = file_bytes.len() as u64;
+/// # let mut metadata_decoder = 
ParquetMetaDataPushDecoder::try_new(file_length).unwrap();
+/// # metadata_decoder.push_ranges(vec![0..file_length], 
vec![file_bytes.clone()]).unwrap();
+/// # let DecodeResult::Data(parquet_metadata) = 
metadata_decoder.try_decode().unwrap() else { panic!() };
+/// # let parquet_metadata = Arc::new(parquet_metadata);
+/// let mut decoder = 
ParquetPushDecoderBuilder::try_new_decoder(parquet_metadata)
+///     .unwrap()
+///     .build()
+///     .unwrap();
+///
+/// // Drive the decoder one row group at a time with `try_next_reader`.
+/// loop {
+///     match decoder.try_next_reader().unwrap() {
+///         DecodeResult::NeedsData(ranges) => {
+///             // Fetch and hand over the bytes the decoder asked for.
+///             let data = ranges.iter().map(|r| get_range(r)).collect();
+///             decoder.push_ranges(ranges, data).unwrap();
+///         }
+///         DecodeResult::Data(reader) => {
+///             // Decode this row group's batches.
+///             for batch in reader {
+///                 assert!(batch.unwrap().num_rows() > 0);
+///             }
+///             // We are now at a row-group boundary. Based on whatever stats
+///             // were gathered, optionally change strategy for the row groups
+///             // still to come: drop or promote a row filter, narrow or widen
+///             // the projection, etc.
+///             if decoder.is_at_row_group_boundary() && 
decoder.row_groups_remaining() > 0 {
+///                 let builder = decoder.into_builder().unwrap();
+///                 // e.g. column "b" turned out not to be needed.
+///                 let projection = 
ProjectionMask::columns(builder.parquet_schema(), ["a"]);
+///                 decoder = 
builder.with_projection(projection).build().unwrap();
+///             }
+///         }
+///         DecodeResult::Finished => break,
+///     }
+/// }
+/// ```
 pub type ParquetPushDecoderBuilder = ArrowReaderBuilder<NoInput>;
 
-/// Type that represents "No input" for the [`ParquetPushDecoderBuilder`]
+/// The `input` slot of a [`ParquetPushDecoderBuilder`].
+///
+/// [`ArrowReaderBuilder`] is shared by the sync, async, and push decoders.
+/// The sync and async builders put a real input there — a file or async
+/// reader — to read from. The push decoder has no reader, by design: the
+/// caller pushes bytes in. So its slot instead holds the `PushBuffers` those
+/// pushed bytes accumulate in.
 ///
-/// There is no "input" for the push decoder by design (the idea is that
-/// the caller pushes data to the decoder as needed)..
+/// A fresh builder starts with empty buffers.
+/// [`ParquetPushDecoder::into_builder`] threads an existing decoder's buffers
+/// back through so a rebuilt decoder keeps the bytes it already holds.
 ///
-/// However, [`ArrowReaderBuilder`] is shared with the sync and async readers,
-/// which DO have an `input`. To support reusing the same builder code for
-/// all three types of decoders, we define this `NoInput` for the push decoder 
to
-/// denote in the type system there is no type.
-#[derive(Debug, Clone, Copy)]
-pub struct NoInput;
+/// The name predates the buffer-carrying behaviour; it still reads as "no
+/// *reader* input".
+#[derive(Debug)]
+pub struct NoInput {

Review Comment:
   Yep renamed :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to