zhuqi-lucas commented on code in PR #19064:
URL: https://github.com/apache/datafusion/pull/19064#discussion_r2619403195


##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -1344,4 +1397,268 @@ mod test {
         assert_eq!(num_batches, 0);
         assert_eq!(num_rows, 0);
     }
+
+    #[tokio::test]
+    async fn test_reverse_scan_row_groups() {
+        use parquet::file::properties::WriterProperties;
+
+        let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+
+        // Create multiple batches to ensure multiple row groups
+        let batch1 =
+            record_batch!(("a", Int32, vec![Some(1), Some(2), 
Some(3)])).unwrap();
+        let batch2 =
+            record_batch!(("a", Int32, vec![Some(4), Some(5), 
Some(6)])).unwrap();
+        let batch3 =
+            record_batch!(("a", Int32, vec![Some(7), Some(8), 
Some(9)])).unwrap();
+
+        // Write parquet file with multiple row groups
+        // Force small row groups by setting max_row_group_size
+        let props = WriterProperties::builder()
+            .set_max_row_group_size(3) // Force each batch into its own row 
group
+            .build();
+
+        let mut out = BytesMut::new().writer();
+        {
+            let mut writer =
+                ArrowWriter::try_new(&mut out, batch1.schema(), 
Some(props)).unwrap();
+            writer.write(&batch1).unwrap();
+            writer.write(&batch2).unwrap();
+            writer.write(&batch3).unwrap();
+            writer.finish().unwrap();
+        }
+        let data = out.into_inner().freeze();
+        let data_len = data.len();
+        store
+            .put(&Path::from("test.parquet"), data.into())
+            .await
+            .unwrap();
+
+        let schema = batch1.schema();
+        let file = PartitionedFile::new(
+            "test.parquet".to_string(),
+            u64::try_from(data_len).unwrap(),
+        );
+
+        let make_opener = |reverse_scan: bool| ParquetOpener {
+            partition_index: 0,
+            projection: ProjectionExprs::from_indices(&[0], &schema),
+            batch_size: 1024,
+            limit: None,
+            predicate: None,
+            table_schema: TableSchema::from_file_schema(Arc::clone(&schema)),
+            metadata_size_hint: None,
+            metrics: ExecutionPlanMetricsSet::new(),
+            parquet_file_reader_factory: 
Arc::new(DefaultParquetFileReaderFactory::new(
+                Arc::clone(&store),
+            )),
+            pushdown_filters: false,
+            reorder_filters: false,
+            force_filter_selections: false,
+            enable_page_index: false,
+            enable_bloom_filter: false,
+            enable_row_group_stats_pruning: false,
+            coerce_int96: None,
+            #[cfg(feature = "parquet_encryption")]
+            file_decryption_properties: None,
+            expr_adapter_factory: Arc::new(DefaultPhysicalExprAdapterFactory),
+            #[cfg(feature = "parquet_encryption")]
+            encryption_factory: None,
+            max_predicate_cache_size: None,
+            reverse_scan_inexact: reverse_scan,
+        };
+
+        // Test normal scan (forward)
+        let opener = make_opener(false);
+        let stream = opener.open(file.clone()).unwrap().await.unwrap();
+        let forward_values = collect_int32_values(stream).await;
+
+        // Test reverse scan
+        let opener = make_opener(true);
+        let stream = opener.open(file.clone()).unwrap().await.unwrap();
+        let reverse_values = collect_int32_values(stream).await;
+
+        // The forward scan should return data in the order written
+        assert_eq!(forward_values, vec![1, 2, 3, 4, 5, 6, 7, 8, 9]);
+
+        // With reverse scan, row groups are reversed, so we expect:
+        // Row group 3 (7,8,9), then row group 2 (4,5,6), then row group 1 
(1,2,3)
+        assert_eq!(reverse_values, vec![7, 8, 9, 4, 5, 6, 1, 2, 3]);
+    }
+
+    #[tokio::test]
+    async fn test_reverse_scan_single_row_group() {
+        let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+
+        // Create a single batch (single row group)
+        let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), 
Some(3)])).unwrap();
+        let data_size =
+            write_parquet(Arc::clone(&store), "test.parquet", 
batch.clone()).await;
+
+        let schema = batch.schema();
+        let file = PartitionedFile::new(
+            "test.parquet".to_string(),
+            u64::try_from(data_size).unwrap(),
+        );
+
+        let make_opener = |reverse_scan: bool| ParquetOpener {
+            partition_index: 0,
+            projection: ProjectionExprs::from_indices(&[0], &schema),
+            batch_size: 1024,
+            limit: None,
+            predicate: None,
+            table_schema: TableSchema::from_file_schema(Arc::clone(&schema)),
+            metadata_size_hint: None,
+            metrics: ExecutionPlanMetricsSet::new(),
+            parquet_file_reader_factory: 
Arc::new(DefaultParquetFileReaderFactory::new(
+                Arc::clone(&store),
+            )),
+            pushdown_filters: false,
+            reorder_filters: false,
+            force_filter_selections: false,
+            enable_page_index: false,
+            enable_bloom_filter: false,
+            enable_row_group_stats_pruning: false,
+            coerce_int96: None,
+            #[cfg(feature = "parquet_encryption")]
+            file_decryption_properties: None,
+            expr_adapter_factory: Arc::new(DefaultPhysicalExprAdapterFactory),
+            #[cfg(feature = "parquet_encryption")]
+            encryption_factory: None,
+            max_predicate_cache_size: None,
+            reverse_scan_inexact: reverse_scan,
+        };
+
+        // With a single row group, forward and reverse should be the same
+        // (only the row group order is reversed, not the rows within)
+        let opener_forward = make_opener(false);
+        let stream_forward = 
opener_forward.open(file.clone()).unwrap().await.unwrap();
+        let (batches_forward, _) = 
count_batches_and_rows(stream_forward).await;
+
+        let opener_reverse = make_opener(true);
+        let stream_reverse = opener_reverse.open(file).unwrap().await.unwrap();
+        let (batches_reverse, _) = 
count_batches_and_rows(stream_reverse).await;
+
+        // Both should have the same number of batches since there's only one 
row group
+        assert_eq!(batches_forward, batches_reverse);
+    }
+
+    #[tokio::test]
+    async fn test_reverse_scan_with_row_selection() {
+        use parquet::file::properties::WriterProperties;
+
+        let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+
+        // Create 3 batches with DIFFERENT selection patterns
+        let batch1 =
+            record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3), 
Some(4)]))
+                .unwrap(); // 4 rows
+        let batch2 =
+            record_batch!(("a", Int32, vec![Some(5), Some(6), Some(7), 
Some(8)]))
+                .unwrap(); // 4 rows
+        let batch3 =
+            record_batch!(("a", Int32, vec![Some(9), Some(10), Some(11), 
Some(12)]))
+                .unwrap(); // 4 rows
+
+        let props = WriterProperties::builder()
+            .set_max_row_group_size(4)
+            .build();
+
+        let mut out = BytesMut::new().writer();
+        {
+            let mut writer =
+                ArrowWriter::try_new(&mut out, batch1.schema(), 
Some(props)).unwrap();
+            writer.write(&batch1).unwrap();
+            writer.write(&batch2).unwrap();
+            writer.write(&batch3).unwrap();
+            writer.finish().unwrap();
+        }
+        let data = out.into_inner().freeze();
+        let data_len = data.len();
+        store
+            .put(&Path::from("test.parquet"), data.into())
+            .await
+            .unwrap();
+
+        let schema = batch1.schema();
+
+        use crate::ParquetAccessPlan;
+        use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
+
+        let mut access_plan = ParquetAccessPlan::new_all(3);
+        // Row group 0: skip first 2, select last 2 (should get: 3, 4)
+        access_plan.scan_selection(
+            0,
+            RowSelection::from(vec![RowSelector::skip(2), 
RowSelector::select(2)]),
+        );
+        // Row group 1: select all (should get: 5, 6, 7, 8)
+        // Row group 2: select first 2, skip last 2 (should get: 9, 10)
+        access_plan.scan_selection(
+            2,
+            RowSelection::from(vec![RowSelector::select(2), 
RowSelector::skip(2)]),
+        );
+
+        let file = PartitionedFile::new(
+            "test.parquet".to_string(),
+            u64::try_from(data_len).unwrap(),
+        )
+        .with_extensions(Arc::new(access_plan));
+
+        let make_opener = |reverse_scan: bool| ParquetOpener {

Review Comment:
   Thanks @alamb , created the ticket for it:
   https://github.com/apache/datafusion/issues/19333



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to