zhuqi-lucas commented on code in PR #18817:
URL: https://github.com/apache/datafusion/pull/18817#discussion_r2581163868


##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -752,10 +799,316 @@ fn should_enable_page_index(
             .unwrap_or(false)
 }
 
+fn reverse_batch(batch: RecordBatch) -> Result<RecordBatch> {
+    let num_rows = batch.num_rows();
+    if num_rows <= 1 {
+        return Ok(batch);
+    }
+
+    let indices = UInt32Array::from_iter_values((0..num_rows as u32).rev());
+
+    let reversed_columns = batch
+        .columns()
+        .iter()
+        .map(|col| take(col.as_ref(), &indices, None))
+        .collect::<std::result::Result<Vec<ArrayRef>, 
arrow::error::ArrowError>>()
+        .map_err(DataFusionError::from)?;
+
+    RecordBatch::try_new(batch.schema(), 
reversed_columns).map_err(DataFusionError::from)
+}
+
+/// Stream adapter for reversed parquet reading with row-group-level buffering.
+///
+/// # Architecture
+///
+/// This stream implements a sophisticated buffering strategy to achieve true 
reverse
+/// reading of Parquet files while maintaining compatibility with the 
underlying
+/// ParquetRecordBatchStream's optimizations (caching, prefetching, etc.).
+///
+/// ## Strategy Overview
+///
+/// 1. **Pre-reversed Row Groups**: Row groups are reversed BEFORE building 
the stream
+///    (via `row_group_indexes.reverse()`). This allows the Parquet reader to 
read
+///    them in reverse order while still utilizing internal optimizations.
+///
+/// 2. **Row-Group-Level Buffering**: As batches arrive from the input stream, 
we
+///    track which row group they belong to using cumulative row counts. This 
is
+///    the MINIMAL buffering unit required for correctness - we cannot reverse
+///    individual batches without knowing the complete row group context.
+///
+/// 3. **Two-Stage Reversal**: When a complete row group is collected:
+///    - Stage 1: Reverse rows within each batch (using Arrow's take kernel)
+///    - Stage 2: Reverse the order of batches within the row group
+///
+/// 4. **Progressive Output**: Reversed batches are output immediately, 
minimizing
+///    memory footprint. We never buffer more than one row group at a time.
+///
+/// ## Memory Characteristics
+///
+/// - **Bounded Memory**: Maximum memory usage = size of largest row group
+/// - **Typical Usage**: ~128MB (default Parquet row group size)
+/// - **Peak Usage**: During reversal of a single row group
+///
+/// ## Why Row-Group-Level Buffering is Necessary
+///
+/// Parquet organizes data into row groups (typically 128MB each), and each 
row group
+/// is independently compressed and encoded. When reading in reverse:
+///
+/// - We cannot reverse individual batches in isolation because they may span
+///   row group boundaries or be split arbitrarily by the batch_size parameter
+/// - We must buffer complete row groups to ensure correct ordering semantics
+/// - This is the minimum granularity that maintains correctness
+///
+/// ## Example
+///
+/// Given a file with 3 row groups, each containing 2 batches:
+///
+/// ```text
+/// Normal order:  RG0[B0, B1] -> RG1[B0, B1] -> RG2[B0, B1]
+/// Reversed:      RG2[B1_rev, B0_rev] -> RG1[B1_rev, B0_rev] -> RG0[B1_rev, 
B0_rev]
+///                     ^^^^^^^^^^^^         ^^^^^^^^^^^^         ^^^^^^^^^^^^
+///                     Output 1st           Output 2nd           Output 3rd
+/// ```
+///
+/// ## Performance Characteristics
+///
+/// - **Latency**: First batch available after reading complete first 
(reversed) row group
+/// - **Throughput**: Near-native speed with ~5-10% overhead for reversal 
operations
+/// - **Memory**: O(row_group_size), not O(file_size)
+///   TODO should we support max cache size to limit memory usage further? But 
if we exceed the cache size we can't reverse properly, so we need to fall back 
to normal reading?

Review Comment:
   Sure @xudong963 , created the follow-up issue:
   https://github.com/apache/datafusion/issues/19048



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to