jkylling commented on code in PR #20133:
URL: https://github.com/apache/datafusion/pull/20133#discussion_r2758686935


##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -2004,4 +2032,649 @@ mod test {
             "Reverse scan with non-contiguous row groups should correctly map 
RowSelection"
         );
     }
+
+    /// Options for reading parquet files in tests
+    #[derive(Default)]
+    struct ReadOptions {
+        projection: Option<Vec<usize>>,
+        partition_values: Option<Vec<ScalarValue>>,
+        predicate: Option<Arc<dyn PhysicalExpr>>,
+    }
+
+    /// Writes a batch to parquet and reads it back with the given options.
+    /// Returns a single RecordBatch.
+    async fn read_parquet(
+        batch: arrow::record_batch::RecordBatch,
+        table_schema: TableSchema,
+        options: ReadOptions,
+    ) -> arrow::record_batch::RecordBatch {
+        let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+
+        let data_size =
+            write_parquet(Arc::clone(&store), "test.parquet", 
batch.clone()).await;
+
+        let mut file = PartitionedFile::new("test.parquet".to_string(), 
data_size as u64);
+        if let Some(partition_values) = options.partition_values {
+            file.partition_values = partition_values;
+        }
+
+        let mut builder = ParquetOpenerBuilder::new()
+            .with_store(Arc::clone(&store))
+            .with_table_schema(table_schema.clone());
+        if let Some(projection) = options.projection {
+            builder = builder.with_projection_indices(&projection);
+        }
+        if let Some(predicate) = options.predicate {
+            builder = builder
+                .with_predicate(predicate)
+                .with_pushdown_filters(true);
+        }
+        let opener = builder.build();
+        let mut stream = opener.open(file).unwrap().await.unwrap();
+
+        let mut batches = vec![];
+        while let Some(Ok(batch)) = stream.next().await {
+            batches.push(batch);
+        }
+        assert_eq!(batches.len(), 1, "Expected exactly one batch");
+        batches.into_iter().next().unwrap()
+    }
+
+    #[tokio::test]
+    async fn test_virtual_columns() {
+        let parquet_data =
+            record_batch!(("a", Int32, vec![Some(1), Some(2), 
Some(3)])).unwrap();
+        let row_number_field = Arc::new(
+            Field::new("row_index", DataType::Int64, false)
+                .with_extension_type(RowNumber),
+        );
+        let table_schema = TableSchema::new_with_virtual_columns(
+            Arc::clone(&parquet_data.schema()),
+            vec![row_number_field],
+            vec![],
+        );
+        let batch =
+            read_parquet(parquet_data, table_schema, 
ReadOptions::default()).await;
+
+        let output_schema = batch.schema();
+        assert_eq!(
+            output_schema.fields().len(),
+            2,
+            "Output should have 2 columns (a and row_index)"
+        );
+        assert_eq!(output_schema.field(0).name(), "a");
+        assert_eq!(output_schema.field(1).name(), "row_index");
+
+        let a_values = batch
+            .column(0)
+            .as_primitive::<Int32Type>()
+            .into_iter()
+            .flatten()
+            .collect::<Vec<_>>();
+        assert_eq!(a_values, vec![1, 2, 3]);
+
+        let row_index_values = batch
+            .column(1)
+            .as_primitive::<Int64Type>()
+            .into_iter()
+            .flatten()
+            .collect::<Vec<_>>();
+        assert_eq!(row_index_values, vec![0, 1, 2]);
+    }
+
+    #[tokio::test]
+    async fn test_virtual_columns_with_projections() {
+        let parquet_data =
+            record_batch!(("a", Int32, vec![Some(1), Some(2), 
Some(3)])).unwrap();
+        let row_number_field = Arc::new(
+            Field::new("row_index", DataType::Int64, false)
+                .with_extension_type(RowNumber),
+        );
+        let table_schema = TableSchema::new_with_virtual_columns(
+            Arc::clone(&parquet_data.schema()),
+            vec![row_number_field],
+            vec![],
+        );
+
+        // Project only the virtual column (index 1)
+        let batch = read_parquet(
+            parquet_data,
+            table_schema,
+            ReadOptions {
+                projection: Some(vec![1]),
+                ..Default::default()
+            },
+        )
+        .await;
+
+        let output_schema = batch.schema();
+        assert_eq!(
+            output_schema.fields().len(),
+            1,
+            "Output should have 1 column (row_index)"
+        );
+        assert_eq!(output_schema.field(0).name(), "row_index");
+
+        let row_index_values = batch
+            .column(0)
+            .as_primitive::<Int64Type>()
+            .into_iter()
+            .flatten()
+            .collect::<Vec<_>>();
+        assert_eq!(row_index_values, vec![0, 1, 2]);
+    }
+
+    #[tokio::test]
+    async fn test_virtual_columns_with_partition_columns() {
+        let parquet_data =
+            record_batch!(("a", Int32, vec![Some(1), Some(2), 
Some(3)])).unwrap();
+
+        let row_number_field = Arc::new(
+            Field::new("row_index", DataType::Int64, false)
+                .with_extension_type(RowNumber),
+        );
+        let partition_col = Arc::new(Field::new("region", DataType::Utf8, 
false));
+        let table_schema = TableSchema::new_with_virtual_columns(
+            Arc::clone(&parquet_data.schema()),
+            vec![row_number_field],
+            vec![partition_col],
+        );
+
+        // Project all columns: file column (0), virtual column (1), partition 
column (2)
+        let batch = read_parquet(
+            parquet_data,
+            table_schema,
+            ReadOptions {
+                partition_values: Some(vec![ScalarValue::Utf8(Some(
+                    "europe".to_string(),
+                ))]),
+                ..Default::default()
+            },
+        )
+        .await;
+
+        let output_schema = batch.schema();
+        assert_eq!(
+            output_schema.fields().len(),
+            3,
+            "Output should have 3 columns (a, row_index, region)"
+        );
+        assert_eq!(output_schema.field(0).name(), "a");
+        assert_eq!(output_schema.field(1).name(), "row_index");
+        assert_eq!(output_schema.field(2).name(), "region");
+
+        let a_values = batch
+            .column(0)
+            .as_primitive::<Int32Type>()
+            .into_iter()
+            .flatten()
+            .collect::<Vec<_>>();
+        assert_eq!(a_values, vec![1, 2, 3], "File column 'a' values");
+
+        let row_index_values = batch
+            .column(1)
+            .as_primitive::<Int64Type>()
+            .into_iter()
+            .flatten()
+            .collect::<Vec<_>>();
+        assert_eq!(
+            row_index_values,
+            vec![0, 1, 2],
+            "Virtual column 'row_index' values"
+        );
+
+        let region_values = batch
+            .column(2)
+            .as_any()
+            .downcast_ref::<arrow::array::StringArray>()
+            .unwrap()
+            .iter()
+            .map(|v| v.unwrap().to_string())
+            .collect::<Vec<_>>();
+        assert_eq!(
+            region_values,
+            vec!["europe", "europe", "europe"],
+            "Partition column 'region' values"
+        );
+    }
+
+    #[tokio::test]
+    async fn test_partition_and_virtual_columns_only() {
+        let parquet_data =
+            record_batch!(("a", Int32, vec![Some(1), Some(2), 
Some(3)])).unwrap();
+
+        let row_number_field = Arc::new(
+            Field::new("row_index", DataType::Int64, false)
+                .with_extension_type(RowNumber),
+        );
+        let partition_col = Arc::new(Field::new("year", DataType::Int32, 
false));
+        let table_schema = TableSchema::new_with_virtual_columns(
+            Arc::clone(&parquet_data.schema()),
+            vec![row_number_field],
+            vec![partition_col],
+        );
+
+        let batch = read_parquet(
+            parquet_data,
+            table_schema,
+            ReadOptions {
+                projection: Some(vec![1, 2]),
+                partition_values: Some(vec![ScalarValue::Int32(Some(2026))]),
+                ..Default::default()
+            },
+        )
+        .await;
+
+        let output_schema = batch.schema();
+        assert_eq!(
+            output_schema.fields().len(),
+            2,
+            "Output should have 2 columns (row_index, year)"
+        );
+        assert_eq!(output_schema.field(0).name(), "row_index");
+        assert_eq!(output_schema.field(1).name(), "year");
+
+        let row_index_values = batch
+            .column(0)
+            .as_primitive::<Int64Type>()
+            .into_iter()
+            .flatten()
+            .collect::<Vec<_>>();
+        assert_eq!(
+            row_index_values,
+            vec![0, 1, 2],
+            "Virtual column 'row_index' values"
+        );
+
+        let year_values = batch
+            .column(1)
+            .as_primitive::<Int32Type>()
+            .into_iter()
+            .flatten()
+            .collect::<Vec<_>>();
+        assert_eq!(
+            year_values,
+            vec![2026, 2026, 2026],
+            "Partition column 'year' values"
+        );
+    }
+
+    #[tokio::test]
+    async fn test_nested_schema_projections() {
+        use arrow::array::{ArrayRef, Int32Array, StructArray};
+        // Create nested schema: a: {b: int32, c: {d: int32, e: int32}}
+        let inner_struct_fields = vec![
+            Field::new("d", DataType::Int32, false),
+            Field::new("e", DataType::Int32, false),
+        ];
+        let inner_struct_type = 
DataType::Struct(inner_struct_fields.clone().into());
+
+        let outer_struct_fields = vec![
+            Field::new("b", DataType::Int32, false),
+            Field::new("c", inner_struct_type.clone(), false),
+        ];
+        let outer_struct_type = 
DataType::Struct(outer_struct_fields.clone().into());
+
+        let a_struct = {
+            let d_array: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 
30]));
+            let e_array: ArrayRef = Arc::new(Int32Array::from(vec![100, 200, 
300]));
+            let c_struct = StructArray::from(vec![
+                (Arc::new(inner_struct_fields[0].clone()), d_array),
+                (Arc::new(inner_struct_fields[1].clone()), e_array),
+            ]);
+            let b_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
+            let c_array: ArrayRef = Arc::new(c_struct);
+            Arc::new(StructArray::from(vec![
+                (Arc::new(outer_struct_fields[0].clone()), b_array),
+                (Arc::new(outer_struct_fields[1].clone()), c_array),
+            ]))
+        };
+
+        let row_number_field = Arc::new(
+            Field::new("row_index", DataType::Int64, false)
+                .with_extension_type(RowNumber),
+        );
+
+        // File schema: x: int32, a: {b: int32, c: {d: int32, e: int32}}, y: 
int32
+        // Table schema: x, a, y, row_index (indices 0, 1, 2, 3)
+        let file_schema = Arc::new(Schema::new(vec![
+            Field::new("x", DataType::Int32, false),
+            Field::new("a", outer_struct_type.clone(), false),
+            Field::new("y", DataType::Int32, false),
+        ]));
+
+        let x_array: ArrayRef = Arc::new(Int32Array::from(vec![100, 200, 
300]));
+        let y_array: ArrayRef = Arc::new(Int32Array::from(vec![1000, 2000, 
3000]));
+
+        let parquet_data = arrow::record_batch::RecordBatch::try_new(
+            file_schema.clone(),
+            vec![x_array, a_struct, y_array],
+        )
+        .unwrap();
+
+        let table_schema = TableSchema::new_with_virtual_columns(
+            file_schema,
+            vec![row_number_field],
+            vec![],
+        );
+
+        // Test 1: Read all columns including row_index
+        {
+            let batch = read_parquet(
+                parquet_data.clone(),
+                table_schema.clone(),
+                ReadOptions::default(),
+            )
+            .await;
+
+            assert_eq!(batch.schema().fields().len(), 4);
+            assert_eq!(batch.schema().field(0).name(), "x");
+            assert_eq!(batch.schema().field(1).name(), "a");
+            assert_eq!(batch.schema().field(2).name(), "y");
+            assert_eq!(batch.schema().field(3).name(), "row_index");
+
+            let row_index_values = batch
+                .column(3)
+                .as_primitive::<Int64Type>()
+                .into_iter()
+                .flatten()
+                .collect::<Vec<_>>();
+            assert_eq!(row_index_values, vec![0, 1, 2]);
+
+            let a_col = batch.column(1).as_struct();
+            let b_values = a_col
+                .column(0)
+                .as_primitive::<Int32Type>()
+                .into_iter()
+                .flatten()
+                .collect::<Vec<_>>();
+            assert_eq!(b_values, vec![1, 2, 3]);
+        }
+
+        // Test 2: Project nested struct and row_index only
+        {
+            let batch = read_parquet(
+                parquet_data.clone(),
+                table_schema.clone(),
+                ReadOptions {
+                    projection: Some(vec![1, 3]),
+                    ..Default::default()
+                },
+            )
+            .await;
+
+            assert_eq!(batch.schema().fields().len(), 2);
+            assert_eq!(batch.schema().field(0).name(), "a");
+            assert_eq!(batch.schema().field(1).name(), "row_index");
+
+            let row_index_values = batch
+                .column(1)
+                .as_primitive::<Int64Type>()
+                .into_iter()
+                .flatten()
+                .collect::<Vec<_>>();
+            assert_eq!(row_index_values, vec![0, 1, 2]);
+
+            let a_col = batch.column(0).as_struct();
+            let c_col = a_col.column(1).as_struct();
+            let d_values = c_col
+                .column(0)
+                .as_primitive::<Int32Type>()
+                .into_iter()
+                .flatten()
+                .collect::<Vec<_>>();
+            assert_eq!(d_values, vec![10, 20, 30]);
+        }
+
+        // Test 3: Project only primitive columns with row_index (skip nested 
struct)
+        {
+            let batch = read_parquet(
+                parquet_data.clone(),
+                table_schema.clone(),
+                ReadOptions {
+                    projection: Some(vec![0, 2, 3]), // x, y, row_index - skip 
'a'
+                    ..Default::default()
+                },
+            )
+            .await;
+
+            assert_eq!(batch.schema().fields().len(), 3);
+            assert_eq!(batch.schema().field(0).name(), "x");
+            assert_eq!(batch.schema().field(1).name(), "y");
+            assert_eq!(batch.schema().field(2).name(), "row_index");
+
+            let x_values = batch
+                .column(0)
+                .as_primitive::<Int32Type>()
+                .into_iter()
+                .flatten()
+                .collect::<Vec<_>>();
+            let y_values = batch
+                .column(1)
+                .as_primitive::<Int32Type>()
+                .into_iter()
+                .flatten()
+                .collect::<Vec<_>>();
+            let row_index_values = batch
+                .column(2)
+                .as_primitive::<Int64Type>()
+                .into_iter()
+                .flatten()
+                .collect::<Vec<_>>();
+            assert_eq!(x_values, vec![100, 200, 300]);
+            assert_eq!(y_values, vec![1000, 2000, 3000]);
+            assert_eq!(row_index_values, vec![0, 1, 2]);
+        }
+
+        // Test 4: Project only the nested column (without row_index)
+        {
+            let batch = read_parquet(
+                parquet_data.clone(),
+                table_schema.clone(),
+                ReadOptions {
+                    projection: Some(vec![1]),
+                    ..Default::default()
+                },
+            )
+            .await;
+
+            assert_eq!(batch.schema().fields().len(), 1);
+            assert_eq!(batch.schema().field(0).name(), "a");
+
+            let a_col = batch.column(0).as_struct();
+            let b_values = a_col
+                .column(0)
+                .as_primitive::<Int32Type>()
+                .into_iter()
+                .flatten()
+                .collect::<Vec<_>>();
+            assert_eq!(b_values, vec![1, 2, 3]);
+        }
+
+        // Test 5: Project columns in different order with row_index
+        {
+            let batch = read_parquet(
+                parquet_data.clone(),
+                table_schema.clone(),
+                ReadOptions {
+                    projection: Some(vec![3, 2, 0, 1]), // row_index, y, x, a 
(reordered)
+                    ..Default::default()
+                },
+            )
+            .await;
+
+            assert_eq!(batch.schema().fields().len(), 4);
+            assert_eq!(batch.schema().field(0).name(), "row_index");
+            assert_eq!(batch.schema().field(1).name(), "y");
+            assert_eq!(batch.schema().field(2).name(), "x");
+            assert_eq!(batch.schema().field(3).name(), "a");
+
+            let row_index_values = batch
+                .column(0)
+                .as_primitive::<Int64Type>()
+                .into_iter()
+                .flatten()
+                .collect::<Vec<_>>();
+            let y_values = batch
+                .column(1)
+                .as_primitive::<Int32Type>()
+                .into_iter()
+                .flatten()
+                .collect::<Vec<_>>();
+            let x_values = batch
+                .column(2)
+                .as_primitive::<Int32Type>()
+                .into_iter()
+                .flatten()
+                .collect::<Vec<_>>();
+            assert_eq!(row_index_values, vec![0, 1, 2]);
+            assert_eq!(y_values, vec![1000, 2000, 3000]);
+            assert_eq!(x_values, vec![100, 200, 300]);
+
+            let a_col = batch.column(3).as_struct();
+            let b_values = a_col
+                .column(0)
+                .as_primitive::<Int32Type>()
+                .into_iter()
+                .flatten()
+                .collect::<Vec<_>>();
+            assert_eq!(b_values, vec![1, 2, 3]);
+        }
+
+        // Test 6: Project only row_index
+        {
+            let batch = read_parquet(
+                parquet_data.clone(),
+                table_schema.clone(),
+                ReadOptions {
+                    projection: Some(vec![3]),
+                    ..Default::default()
+                },
+            )
+            .await;
+
+            assert_eq!(batch.schema().fields().len(), 1);
+            assert_eq!(batch.schema().field(0).name(), "row_index");
+
+            let row_index_values = batch
+                .column(0)
+                .as_primitive::<Int64Type>()
+                .into_iter()
+                .flatten()
+                .collect::<Vec<_>>();
+            assert_eq!(row_index_values, vec![0, 1, 2]);
+        }
+    }
+
+    #[tokio::test]
+    async fn test_predicate_with_virtual_columns() {
+        let parquet_data = record_batch!((
+            "a",
+            Int32,
+            vec![Some(10), Some(20), Some(30), Some(40), Some(50)]
+        ))
+        .unwrap();
+
+        let row_number_field = Arc::new(
+            Field::new("row_index", DataType::Int64, false)
+                .with_extension_type(RowNumber),
+        );
+        let table_schema = TableSchema::new_with_virtual_columns(
+            parquet_data.schema(),
+            vec![row_number_field],
+            vec![],
+        );
+
+        // Test 1: Filter on file column (a > 20) with virtual column in schema
+        {
+            let expr = col("a").gt(lit(20));
+            let predicate = logical2physical(&expr, 
table_schema.table_schema());
+
+            let batch = read_parquet(
+                parquet_data.clone(),
+                table_schema.clone(),
+                ReadOptions {
+                    predicate: Some(predicate),
+                    ..Default::default()
+                },
+            )
+            .await;
+
+            let a_values = batch
+                .column(0)
+                .as_primitive::<Int32Type>()
+                .into_iter()
+                .flatten()
+                .collect::<Vec<_>>();
+            assert_eq!(a_values, vec![30, 40, 50]);
+
+            let row_index_values = batch
+                .column(1)
+                .as_primitive::<Int64Type>()
+                .into_iter()
+                .flatten()
+                .collect::<Vec<_>>();
+            assert_eq!(row_index_values, vec![2, 3, 4]);
+        }
+
+        // Test 2: Filter on virtual column does not have predicate pushdown

Review Comment:
   No filtering on virtual columns in the Parquet source.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to