alamb commented on code in PR #9968:
URL: https://github.com/apache/arrow-rs/pull/9968#discussion_r3250507202


##########
parquet/src/arrow/push_decoder/remaining.rs:
##########
@@ -213,6 +248,35 @@ impl RemainingRowGroups {
         }
     }
 
+    /// Decompose into [`RemainingRowGroupsParts`].
+    ///
+    /// Must be called at a row-group boundary (see
+    /// [`Self::is_at_row_group_boundary`]). The inner reader builder's runtime
+    /// decode state is discarded; its buffered bytes are carried through in
+    /// [`RemainingRowGroupsParts::reader_builder`].
+    pub(crate) fn into_parts(self) -> RemainingRowGroupsParts {
+        let Self {
+            schema,
+            frontier,
+            row_group_reader_builder,
+        } = self;
+        let RowGroupFrontier {
+            parquet_metadata: _,

Review Comment:
   we probably want to pass along the metadata as well, right?



##########
parquet/src/arrow/push_decoder/mod.rs:
##########
@@ -109,19 +109,117 @@ use std::sync::Arc;
 ///         }
 ///     }
 /// ```
+///
+/// # Adaptive scans
+///
+/// The scan strategy is not fixed once [`build`](Self::build) is called: it
+/// can be changed *while decoding*, at row-group boundaries.
+///
+/// The important API for this is [`ParquetPushDecoder::try_next_reader`].
+/// Unlike [`try_decode`](ParquetPushDecoder::try_decode), which barrels
+/// straight through row-group boundaries, `try_next_reader` returns once per
+/// row group — leaving a clean window *between* row groups. At any such
+/// boundary, [`ParquetPushDecoder::into_builder`] hands back a
+/// `ParquetPushDecoderBuilder` for the row groups not yet decoded. Change any
+/// option on it (projection, row filter, row selection policy, …) and
+/// [`build`](Self::build) a fresh decoder that resumes from the next row
+/// group. This is how a query engine promotes or demotes filters — for
+/// example turning a row filter on or off — based on the selectivity observed
+/// in the row groups decoded so far.
+///
+/// ```
+/// # use std::ops::Range;
+/// # use std::sync::Arc;
+/// # use bytes::Bytes;
+/// # use arrow_array::record_batch;
+/// # use parquet::DecodeResult;
+/// # use parquet::arrow::ProjectionMask;
+/// # use parquet::arrow::push_decoder::ParquetPushDecoderBuilder;
+/// # use parquet::arrow::ArrowWriter;
+/// # use parquet::file::metadata::ParquetMetaDataPushDecoder;
+/// # use parquet::file::properties::WriterProperties;
+/// # let file_bytes = {
+/// #   let batch = record_batch!(
+/// #       ("a", Int32, [1, 2, 3, 4, 5, 6]),
+/// #       ("b", Int32, [6, 5, 4, 3, 2, 1])
+/// #   ).unwrap();
+/// #   // Small row groups so the test file has two of them.
+/// #   let props = 
WriterProperties::builder().set_max_row_group_row_count(Some(3)).build();
+/// #   let mut buffer = vec![];
+/// #   let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), 
Some(props)).unwrap();
+/// #   writer.write(&batch).unwrap();
+/// #   writer.close().unwrap();
+/// #   Bytes::from(buffer)
+/// # };
+/// # let get_range = |r: &Range<u64>| file_bytes.slice(r.start as 
usize..r.end as usize);
+/// # let file_length = file_bytes.len() as u64;
+/// # let mut metadata_decoder = 
ParquetMetaDataPushDecoder::try_new(file_length).unwrap();
+/// # metadata_decoder.push_ranges(vec![0..file_length], 
vec![file_bytes.clone()]).unwrap();
+/// # let DecodeResult::Data(parquet_metadata) = 
metadata_decoder.try_decode().unwrap() else { panic!() };
+/// # let parquet_metadata = Arc::new(parquet_metadata);
+/// let mut decoder = 
ParquetPushDecoderBuilder::try_new_decoder(parquet_metadata)
+///     .unwrap()
+///     .build()
+///     .unwrap();
+///
+/// // Drive the decoder one row group at a time with `try_next_reader`.
+/// loop {
+///     match decoder.try_next_reader().unwrap() {
+///         DecodeResult::NeedsData(ranges) => {
+///             // Fetch and hand over the bytes the decoder asked for.
+///             let data = ranges.iter().map(|r| get_range(r)).collect();
+///             decoder.push_ranges(ranges, data).unwrap();
+///         }
+///         DecodeResult::Data(reader) => {
+///             // Decode this row group's batches.
+///             for batch in reader {
+///                 assert!(batch.unwrap().num_rows() > 0);
+///             }
+///             // We are now at a row-group boundary. Based on whatever stats
+///             // were gathered, optionally change strategy for the row groups
+///             // still to come: drop or promote a row filter, narrow or widen

Review Comment:
   this is very cool



##########
parquet/src/arrow/push_decoder/mod.rs:
##########
@@ -156,15 +254,29 @@ impl ParquetPushDecoderBuilder {
     /// See [`ArrowReaderMetadata::try_new`] for how to create the metadata 
from
     /// the Parquet metadata and reader options.
     pub fn new_with_metadata(arrow_reader_metadata: ArrowReaderMetadata) -> 
Self {
-        Self::new_builder(NoInput, arrow_reader_metadata)
+        Self::new_builder(NoInput::default(), arrow_reader_metadata)
+    }
+
+    /// Reuse a [`PushBuffers`] when [`build`](Self::build)ing the decoder so
+    /// that bytes already fetched are not requested again.
+    ///
+    /// This is how [`ParquetPushDecoder::into_builder`] carries a decoder's

Review Comment:
   I don't think this context is particularly relevant -- It would be simpler 
if the API was just like "provide a preexisting PushBuffers to read from"



##########
parquet/src/arrow/push_decoder/mod.rs:
##########
@@ -109,19 +109,117 @@ use std::sync::Arc;
 ///         }
 ///     }
 /// ```
+///
+/// # Adaptive scans
+///
+/// The scan strategy is not fixed once [`build`](Self::build) is called: it
+/// can be changed *while decoding*, at row-group boundaries.
+///
+/// The important API for this is [`ParquetPushDecoder::try_next_reader`].
+/// Unlike [`try_decode`](ParquetPushDecoder::try_decode), which barrels
+/// straight through row-group boundaries, `try_next_reader` returns once per
+/// row group — leaving a clean window *between* row groups. At any such
+/// boundary, [`ParquetPushDecoder::into_builder`] hands back a
+/// `ParquetPushDecoderBuilder` for the row groups not yet decoded. Change any
+/// option on it (projection, row filter, row selection policy, …) and
+/// [`build`](Self::build) a fresh decoder that resumes from the next row
+/// group. This is how a query engine promotes or demotes filters — for
+/// example turning a row filter on or off — based on the selectivity observed
+/// in the row groups decoded so far.
+///
+/// ```
+/// # use std::ops::Range;
+/// # use std::sync::Arc;
+/// # use bytes::Bytes;
+/// # use arrow_array::record_batch;
+/// # use parquet::DecodeResult;
+/// # use parquet::arrow::ProjectionMask;
+/// # use parquet::arrow::push_decoder::ParquetPushDecoderBuilder;
+/// # use parquet::arrow::ArrowWriter;
+/// # use parquet::file::metadata::ParquetMetaDataPushDecoder;
+/// # use parquet::file::properties::WriterProperties;
+/// # let file_bytes = {
+/// #   let batch = record_batch!(
+/// #       ("a", Int32, [1, 2, 3, 4, 5, 6]),
+/// #       ("b", Int32, [6, 5, 4, 3, 2, 1])
+/// #   ).unwrap();
+/// #   // Small row groups so the test file has two of them.
+/// #   let props = 
WriterProperties::builder().set_max_row_group_row_count(Some(3)).build();
+/// #   let mut buffer = vec![];
+/// #   let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), 
Some(props)).unwrap();
+/// #   writer.write(&batch).unwrap();
+/// #   writer.close().unwrap();
+/// #   Bytes::from(buffer)
+/// # };
+/// # let get_range = |r: &Range<u64>| file_bytes.slice(r.start as 
usize..r.end as usize);
+/// # let file_length = file_bytes.len() as u64;
+/// # let mut metadata_decoder = 
ParquetMetaDataPushDecoder::try_new(file_length).unwrap();
+/// # metadata_decoder.push_ranges(vec![0..file_length], 
vec![file_bytes.clone()]).unwrap();
+/// # let DecodeResult::Data(parquet_metadata) = 
metadata_decoder.try_decode().unwrap() else { panic!() };
+/// # let parquet_metadata = Arc::new(parquet_metadata);
+/// let mut decoder = 
ParquetPushDecoderBuilder::try_new_decoder(parquet_metadata)
+///     .unwrap()
+///     .build()
+///     .unwrap();
+///
+/// // Drive the decoder one row group at a time with `try_next_reader`.
+/// loop {
+///     match decoder.try_next_reader().unwrap() {
+///         DecodeResult::NeedsData(ranges) => {
+///             // Fetch and hand over the bytes the decoder asked for.
+///             let data = ranges.iter().map(|r| get_range(r)).collect();
+///             decoder.push_ranges(ranges, data).unwrap();
+///         }
+///         DecodeResult::Data(reader) => {
+///             // Decode this row group's batches.
+///             for batch in reader {
+///                 assert!(batch.unwrap().num_rows() > 0);
+///             }
+///             // We are now at a row-group boundary. Based on whatever stats
+///             // were gathered, optionally change strategy for the row groups
+///             // still to come: drop or promote a row filter, narrow or widen
+///             // the projection, etc.
+///             if decoder.is_at_row_group_boundary() && 
decoder.row_groups_remaining() > 0 {
+///                 let builder = decoder.into_builder().unwrap();
+///                 // e.g. column "b" turned out not to be needed.
+///                 let projection = 
ProjectionMask::columns(builder.parquet_schema(), ["a"]);
+///                 decoder = 
builder.with_projection(projection).build().unwrap();
+///             }
+///         }
+///         DecodeResult::Finished => break,
+///     }
+/// }
+/// ```
 pub type ParquetPushDecoderBuilder = ArrowReaderBuilder<NoInput>;
 
-/// Type that represents "No input" for the [`ParquetPushDecoderBuilder`]
+/// The `input` slot of a [`ParquetPushDecoderBuilder`].
+///
+/// [`ArrowReaderBuilder`] is shared by the sync, async, and push decoders.
+/// The sync and async builders put a real input there — a file or async
+/// reader — to read from. The push decoder has no reader, by design: the
+/// caller pushes bytes in. So its slot instead holds the `PushBuffers` those
+/// pushed bytes accumulate in.
 ///
-/// There is no "input" for the push decoder by design (the idea is that
-/// the caller pushes data to the decoder as needed)..
+/// A fresh builder starts with empty buffers.
+/// [`ParquetPushDecoder::into_builder`] threads an existing decoder's buffers
+/// back through so a rebuilt decoder keeps the bytes it already holds.
 ///
-/// However, [`ArrowReaderBuilder`] is shared with the sync and async readers,
-/// which DO have an `input`. To support reusing the same builder code for
-/// all three types of decoders, we define this `NoInput` for the push decoder 
to
-/// denote in the type system there is no type.
-#[derive(Debug, Clone, Copy)]
-pub struct NoInput;
+/// The name predates the buffer-carrying behaviour; it still reads as "no
+/// *reader* input".
+#[derive(Debug)]
+pub struct NoInput {

Review Comment:
   Since this is technically a breaking change anyways, perhaps we should just 
rename the struct to ` BuffersInput` or `PushDecoderInput` ?



##########
parquet/src/arrow/push_decoder/mod.rs:
##########
@@ -1476,6 +1770,310 @@ mod test {
         expect_finished(decoder.try_decode());
     }
 
+    /// `into_builder` between row groups recovers a builder for the
+    /// not-yet-decoded row groups; rebuilding it with a new row filter
+    /// applies that filter to the subsequent row groups while leaving the
+    /// already-decoded row group's results untouched.
+    ///
+    /// Adaptive callers should drive the decoder with `try_next_reader`

Review Comment:
   I think this context is now covered on the encoder builder docs -- perhaps 
you can just leave a reference to those comments here



##########
parquet/src/arrow/push_decoder/mod.rs:
##########
@@ -1476,6 +1770,310 @@ mod test {
         expect_finished(decoder.try_decode());
     }
 
+    /// `into_builder` between row groups recovers a builder for the
+    /// not-yet-decoded row groups; rebuilding it with a new row filter
+    /// applies that filter to the subsequent row groups while leaving the
+    /// already-decoded row group's results untouched.
+    ///
+    /// Adaptive callers should drive the decoder with `try_next_reader`
+    /// rather than `try_decode`: `try_next_reader` returns once per row
+    /// group, giving the caller a clean window between two consecutive
+    /// returns to inspect stats and reconfigure the scan. `try_decode`
+    /// barrels through row-group boundaries and is unsuitable for in-flight
+    /// strategy changes.
+    #[test]
+    fn test_into_builder_installs_filter_between_row_groups() {
+        let metadata = test_file_parquet_metadata();
+        let schema_descr = metadata.file_metadata().schema_descr_ptr();
+
+        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(metadata)
+            .unwrap()
+            .with_batch_size(1024)
+            .build()
+            .unwrap();
+        decoder
+            .push_range(test_file_range(), TEST_FILE_DATA.clone())
+            .unwrap();
+
+        // Reader for row group 0 — no filter.
+        let reader0 = expect_data(decoder.try_next_reader());
+        let batches0: Vec<_> = reader0.collect::<Result<_, _>>().unwrap();
+        let batch0 = concat_batches(&TEST_BATCH.schema(), &batches0).unwrap();
+        assert_eq!(batch0, TEST_BATCH.slice(0, 200));
+
+        // We're between row groups now. Rebuild with a filter on column "a".
+        assert!(decoder.is_at_row_group_boundary());
+        assert_eq!(decoder.row_groups_remaining(), 1);
+        let filter =
+            ArrowPredicateFn::new(ProjectionMask::columns(&schema_descr, 
["a"]), |batch| {
+                gt(batch.column(0), &Int64Array::new_scalar(250))
+            });
+        let mut decoder = decoder
+            .into_builder()
+            .unwrap()
+            .with_row_filter(RowFilter::new(vec![Box::new(filter)]))
+            .build()
+            .unwrap();
+
+        // Reader for row group 1 — filter applied. The rebuilt decoder kept
+        // the buffered bytes (see 
`test_into_builder_preserves_buffered_bytes`)
+        // so no data needs to be re-supplied. Column "a" in RG1 has values
+        // 200..399; `a > 250` keeps 251..399 = 149 rows.
+        let reader1 = expect_data(decoder.try_next_reader());
+        let batches1: Vec<_> = reader1.collect::<Result<_, _>>().unwrap();
+        let batch1 = concat_batches(&TEST_BATCH.schema(), &batches1).unwrap();
+        assert_eq!(batch1, TEST_BATCH.slice(251, 149));
+        expect_finished(decoder.try_next_reader());
+    }
+
+    /// `into_builder` is rejected while a row group's reader is being
+    /// drained (`DecodingRowGroup`); the error points at
+    /// `is_at_row_group_boundary`.
+    #[test]
+    fn test_into_builder_rejected_mid_row_group() {
+        let mut decoder = 
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
+            .unwrap()
+            .with_batch_size(50)
+            .build()
+            .unwrap();
+        decoder
+            .push_range(test_file_range(), TEST_FILE_DATA.clone())
+            .unwrap();
+
+        // After getting the first batch, we're inside `DecodingRowGroup`:
+        // an active reader is still alive. Mid-reader is not a boundary.
+        let _ = expect_data(decoder.try_decode());

Review Comment:
   why ignore the output?



##########
parquet/src/arrow/push_decoder/mod.rs:
##########
@@ -109,19 +109,117 @@ use std::sync::Arc;
 ///         }
 ///     }
 /// ```
+///
+/// # Adaptive scans
+///
+/// The scan strategy is not fixed once [`build`](Self::build) is called: it
+/// can be changed *while decoding*, at row-group boundaries.
+///
+/// The important API for this is [`ParquetPushDecoder::try_next_reader`].
+/// Unlike [`try_decode`](ParquetPushDecoder::try_decode), which barrels
+/// straight through row-group boundaries, `try_next_reader` returns once per
+/// row group — leaving a clean window *between* row groups. At any such
+/// boundary, [`ParquetPushDecoder::into_builder`] hands back a
+/// `ParquetPushDecoderBuilder` for the row groups not yet decoded. Change any
+/// option on it (projection, row filter, row selection policy, …) and
+/// [`build`](Self::build) a fresh decoder that resumes from the next row
+/// group. This is how a query engine promotes or demotes filters — for
+/// example turning a row filter on or off — based on the selectivity observed
+/// in the row groups decoded so far.
+///
+/// ```
+/// # use std::ops::Range;
+/// # use std::sync::Arc;
+/// # use bytes::Bytes;
+/// # use arrow_array::record_batch;
+/// # use parquet::DecodeResult;
+/// # use parquet::arrow::ProjectionMask;
+/// # use parquet::arrow::push_decoder::ParquetPushDecoderBuilder;
+/// # use parquet::arrow::ArrowWriter;
+/// # use parquet::file::metadata::ParquetMetaDataPushDecoder;
+/// # use parquet::file::properties::WriterProperties;
+/// # let file_bytes = {
+/// #   let batch = record_batch!(
+/// #       ("a", Int32, [1, 2, 3, 4, 5, 6]),
+/// #       ("b", Int32, [6, 5, 4, 3, 2, 1])
+/// #   ).unwrap();
+/// #   // Small row groups so the test file has two of them.
+/// #   let props = 
WriterProperties::builder().set_max_row_group_row_count(Some(3)).build();
+/// #   let mut buffer = vec![];
+/// #   let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), 
Some(props)).unwrap();
+/// #   writer.write(&batch).unwrap();
+/// #   writer.close().unwrap();
+/// #   Bytes::from(buffer)
+/// # };
+/// # let get_range = |r: &Range<u64>| file_bytes.slice(r.start as 
usize..r.end as usize);
+/// # let file_length = file_bytes.len() as u64;
+/// # let mut metadata_decoder = 
ParquetMetaDataPushDecoder::try_new(file_length).unwrap();
+/// # metadata_decoder.push_ranges(vec![0..file_length], 
vec![file_bytes.clone()]).unwrap();
+/// # let DecodeResult::Data(parquet_metadata) = 
metadata_decoder.try_decode().unwrap() else { panic!() };
+/// # let parquet_metadata = Arc::new(parquet_metadata);
+/// let mut decoder = 
ParquetPushDecoderBuilder::try_new_decoder(parquet_metadata)
+///     .unwrap()
+///     .build()
+///     .unwrap();
+///
+/// // Drive the decoder one row group at a time with `try_next_reader`.
+/// loop {
+///     match decoder.try_next_reader().unwrap() {
+///         DecodeResult::NeedsData(ranges) => {
+///             // Fetch and hand over the bytes the decoder asked for.
+///             let data = ranges.iter().map(|r| get_range(r)).collect();
+///             decoder.push_ranges(ranges, data).unwrap();
+///         }
+///         DecodeResult::Data(reader) => {
+///             // Decode this row group's batches.
+///             for batch in reader {
+///                 assert!(batch.unwrap().num_rows() > 0);
+///             }
+///             // We are now at a row-group boundary. Based on whatever stats
+///             // were gathered, optionally change strategy for the row groups
+///             // still to come: drop or promote a row filter, narrow or widen
+///             // the projection, etc.
+///             if decoder.is_at_row_group_boundary() && 
decoder.row_groups_remaining() > 0 {
+///                 let builder = decoder.into_builder().unwrap();
+///                 // e.g. column "b" turned out not to be needed.
+///                 let projection = 
ProjectionMask::columns(builder.parquet_schema(), ["a"]);
+///                 decoder = 
builder.with_projection(projection).build().unwrap();
+///             }
+///         }
+///         DecodeResult::Finished => break,
+///     }
+/// }
+/// ```
 pub type ParquetPushDecoderBuilder = ArrowReaderBuilder<NoInput>;
 
-/// Type that represents "No input" for the [`ParquetPushDecoderBuilder`]
+/// The `input` slot of a [`ParquetPushDecoderBuilder`].
+///
+/// [`ArrowReaderBuilder`] is shared by the sync, async, and push decoders.
+/// The sync and async builders put a real input there — a file or async
+/// reader — to read from. The push decoder has no reader, by design: the
+/// caller pushes bytes in. So its slot instead holds the `PushBuffers` those
+/// pushed bytes accumulate in.
 ///
-/// There is no "input" for the push decoder by design (the idea is that
-/// the caller pushes data to the decoder as needed)..
+/// A fresh builder starts with empty buffers.
+/// [`ParquetPushDecoder::into_builder`] threads an existing decoder's buffers
+/// back through so a rebuilt decoder keeps the bytes it already holds.
 ///
-/// However, [`ArrowReaderBuilder`] is shared with the sync and async readers,
-/// which DO have an `input`. To support reusing the same builder code for
-/// all three types of decoders, we define this `NoInput` for the push decoder 
to
-/// denote in the type system there is no type.
-#[derive(Debug, Clone, Copy)]
-pub struct NoInput;
+/// The name predates the buffer-carrying behaviour; it still reads as "no

Review Comment:
   this sentence (and the rest of these comments can probably be cleaned up) 
but otherwise this looks good



##########
parquet/src/arrow/push_decoder/mod.rs:
##########
@@ -1476,6 +1770,310 @@ mod test {
         expect_finished(decoder.try_decode());
     }
 
+    /// `into_builder` between row groups recovers a builder for the
+    /// not-yet-decoded row groups; rebuilding it with a new row filter
+    /// applies that filter to the subsequent row groups while leaving the
+    /// already-decoded row group's results untouched.
+    ///
+    /// Adaptive callers should drive the decoder with `try_next_reader`
+    /// rather than `try_decode`: `try_next_reader` returns once per row
+    /// group, giving the caller a clean window between two consecutive
+    /// returns to inspect stats and reconfigure the scan. `try_decode`
+    /// barrels through row-group boundaries and is unsuitable for in-flight
+    /// strategy changes.
+    #[test]
+    fn test_into_builder_installs_filter_between_row_groups() {
+        let metadata = test_file_parquet_metadata();
+        let schema_descr = metadata.file_metadata().schema_descr_ptr();
+
+        let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(metadata)
+            .unwrap()
+            .with_batch_size(1024)
+            .build()
+            .unwrap();
+        decoder
+            .push_range(test_file_range(), TEST_FILE_DATA.clone())
+            .unwrap();
+
+        // Reader for row group 0 — no filter.
+        let reader0 = expect_data(decoder.try_next_reader());
+        let batches0: Vec<_> = reader0.collect::<Result<_, _>>().unwrap();
+        let batch0 = concat_batches(&TEST_BATCH.schema(), &batches0).unwrap();
+        assert_eq!(batch0, TEST_BATCH.slice(0, 200));
+
+        // We're between row groups now. Rebuild with a filter on column "a".
+        assert!(decoder.is_at_row_group_boundary());
+        assert_eq!(decoder.row_groups_remaining(), 1);
+        let filter =
+            ArrowPredicateFn::new(ProjectionMask::columns(&schema_descr, 
["a"]), |batch| {
+                gt(batch.column(0), &Int64Array::new_scalar(250))
+            });
+        let mut decoder = decoder
+            .into_builder()
+            .unwrap()
+            .with_row_filter(RowFilter::new(vec![Box::new(filter)]))
+            .build()
+            .unwrap();
+
+        // Reader for row group 1 — filter applied. The rebuilt decoder kept
+        // the buffered bytes (see 
`test_into_builder_preserves_buffered_bytes`)
+        // so no data needs to be re-supplied. Column "a" in RG1 has values
+        // 200..399; `a > 250` keeps 251..399 = 149 rows.
+        let reader1 = expect_data(decoder.try_next_reader());
+        let batches1: Vec<_> = reader1.collect::<Result<_, _>>().unwrap();
+        let batch1 = concat_batches(&TEST_BATCH.schema(), &batches1).unwrap();
+        assert_eq!(batch1, TEST_BATCH.slice(251, 149));
+        expect_finished(decoder.try_next_reader());
+    }
+
+    /// `into_builder` is rejected while a row group's reader is being
+    /// drained (`DecodingRowGroup`); the error points at
+    /// `is_at_row_group_boundary`.
+    #[test]
+    fn test_into_builder_rejected_mid_row_group() {
+        let mut decoder = 
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
+            .unwrap()
+            .with_batch_size(50)
+            .build()
+            .unwrap();
+        decoder
+            .push_range(test_file_range(), TEST_FILE_DATA.clone())
+            .unwrap();
+
+        // After getting the first batch, we're inside `DecodingRowGroup`:
+        // an active reader is still alive. Mid-reader is not a boundary.
+        let _ = expect_data(decoder.try_decode());
+        assert!(!decoder.is_at_row_group_boundary());
+
+        let err = decoder.into_builder().unwrap_err();
+        let err_msg = format!("{err}");
+        assert!(
+            err_msg.contains("is_at_row_group_boundary"),
+            "unexpected error: {err_msg}"
+        );
+    }
+
+    /// `into_builder` is rejected once the decoder has finished.
+    #[test]
+    fn test_into_builder_rejected_on_finished_decoder() {
+        let mut decoder = 
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
+            .unwrap()
+            .build()
+            .unwrap();
+        decoder
+            .push_range(test_file_range(), TEST_FILE_DATA.clone())
+            .unwrap();

Review Comment:
   A lof of this test setup is repeated multiple times -- maybe we can make a 
function to avoid repeating the same 8 lines many times?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to