This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 3d5428da84 General virtual columns support + row numbers as a first 
use-case (#8715)
3d5428da84 is described below

commit 3d5428da8478e8a74fa8fca3adbe1cb2166b5de5
Author: Vukasin Stefanovic <[email protected]>
AuthorDate: Fri Nov 14 22:17:29 2025 +0100

    General virtual columns support + row numbers as a first use-case (#8715)
    
    Based on https://github.com/apache/arrow-rs/pull/7307.
    
    # Which issue does this PR close?
    
    - Closes https://github.com/apache/arrow-rs/issues/7299
    
    # Rationale for this change
    
    We need row numbers for many of the downstream features, e.g. computing
    unique row identifier in iceberg.
    
    # What changes are included in this PR?
    
    New API to get row numbers as a virtual column:
    ```
    let file = File::open(path).unwrap();
    let row_number_field = Field::new("row_number", ArrowDataType::Int64, 
false).with_extension_type(RowNumber);
    let options = 
ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field]);
    let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, 
options)
        .unwrap()
        .build()
        .expect("Could not create reader");
    reader
        .collect::<Result<Vec<_>, _>>()
        .expect("Could not read")
        ```
    ```
    This column is defined as an extension type.
    Parquet metadata is propagated to the array builder to compute first row
    indexes.
    New Virtual column is included in addition to Primitive and Group.
    
    # Are these changes tested?
    
    Yes
    
    # Are there any user-facing changes?
    
    This is user facing feature, and has added docstrings.
    No breaking changes, at least I tried not to, by creating a duplicate of
    public method to add more parameters.
    
    ---------
    
    Co-authored-by: Jonas Irgens Kylling <[email protected]>
    Co-authored-by: scovich <[email protected]>
    Co-authored-by: Andrew Lamb <[email protected]>
---
 parquet/examples/read_with_rowgroup.rs             |  42 ++-
 parquet/src/arrow/array_reader/builder.rs          |  71 +++-
 parquet/src/arrow/array_reader/list_array.rs       |   1 +
 parquet/src/arrow/array_reader/mod.rs              |  23 +-
 parquet/src/arrow/array_reader/row_number.rs       | 108 ++++++
 parquet/src/arrow/arrow_reader/mod.rs              | 377 ++++++++++++++++++++-
 parquet/src/arrow/async_reader/mod.rs              |  66 +++-
 parquet/src/arrow/in_memory_row_group.rs           |  10 +-
 parquet/src/arrow/mod.rs                           |   5 +-
 .../src/arrow/push_decoder/reader_builder/mod.rs   |   4 +-
 parquet/src/arrow/schema/complex.rs                |  63 +++-
 parquet/src/arrow/schema/mod.rs                    | 166 ++++++++-
 parquet/src/arrow/schema/virtual_type.rs           | 152 +++++++++
 parquet/src/file/metadata/thrift/mod.rs            |  65 +++-
 14 files changed, 1104 insertions(+), 49 deletions(-)

diff --git a/parquet/examples/read_with_rowgroup.rs 
b/parquet/examples/read_with_rowgroup.rs
index a4c5fdc04d..e3c714c280 100644
--- a/parquet/examples/read_with_rowgroup.rs
+++ b/parquet/examples/read_with_rowgroup.rs
@@ -22,7 +22,7 @@ use parquet::arrow::async_reader::AsyncFileReader;
 use parquet::arrow::{ProjectionMask, parquet_to_arrow_field_levels};
 use parquet::column::page::{PageIterator, PageReader};
 use parquet::errors::{ParquetError, Result};
-use parquet::file::metadata::RowGroupMetaData;
+use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
 use parquet::file::reader::{ChunkReader, Length};
 use parquet::file::serialized_reader::SerializedPageReader;
 use std::sync::Arc;
@@ -35,10 +35,11 @@ async fn main() -> Result<()> {
     let mut file = File::open(&path).await.unwrap();
 
     // The metadata could be cached in other places, this example only shows 
how to read
-    let metadata = file.get_metadata(None).await?;
+    let metadata = Arc::try_unwrap(file.get_metadata(None).await?).unwrap();
 
-    for rg in metadata.row_groups() {
-        let mut rowgroup = InMemoryRowGroup::create(rg.clone(), 
ProjectionMask::all());
+    for row_group_idx in 0..metadata.row_groups().len() {
+        let mut rowgroup =
+            InMemoryRowGroup::create(metadata.clone(), row_group_idx, 
ProjectionMask::all());
         rowgroup.async_fetch_data(&mut file, None).await?;
         let reader = rowgroup.build_reader(1024, None)?;
 
@@ -100,14 +101,15 @@ impl ChunkReader for ColumnChunkData {
 
 #[derive(Clone)]
 pub struct InMemoryRowGroup {
-    pub metadata: RowGroupMetaData,
+    metadata: ParquetMetaData,
+    row_group_idx: usize,
     mask: ProjectionMask,
     column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
 }
 
 impl RowGroups for InMemoryRowGroup {
     fn num_rows(&self) -> usize {
-        self.metadata.num_rows() as usize
+        self.row_group_metadata().num_rows() as usize
     }
 
     fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
@@ -118,7 +120,7 @@ impl RowGroups for InMemoryRowGroup {
             Some(data) => {
                 let page_reader: Box<dyn PageReader> = 
Box::new(SerializedPageReader::new(
                     data.clone(),
-                    self.metadata.column(i),
+                    self.row_group_metadata().column(i),
                     self.num_rows(),
                     None,
                 )?);
@@ -129,26 +131,44 @@ impl RowGroups for InMemoryRowGroup {
             }
         }
     }
+
+    fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
+        Box::new(std::iter::once(self.row_group_metadata()))
+    }
+
+    fn metadata(&self) -> &ParquetMetaData {
+        &self.metadata
+    }
 }
 
 impl InMemoryRowGroup {
-    pub fn create(metadata: RowGroupMetaData, mask: ProjectionMask) -> Self {
-        let column_chunks = metadata.columns().iter().map(|_| 
None).collect::<Vec<_>>();
+    pub fn create(metadata: ParquetMetaData, row_group_idx: usize, mask: 
ProjectionMask) -> Self {
+        let column_chunks = metadata
+            .row_group(row_group_idx)
+            .columns()
+            .iter()
+            .map(|_| None)
+            .collect::<Vec<_>>();
 
         Self {
             metadata,
+            row_group_idx,
             mask,
             column_chunks,
         }
     }
 
+    pub fn row_group_metadata(&self) -> &RowGroupMetaData {
+        self.metadata.row_group(self.row_group_idx)
+    }
+
     pub fn build_reader(
         &self,
         batch_size: usize,
         selection: Option<RowSelection>,
     ) -> Result<ParquetRecordBatchReader> {
         let levels = parquet_to_arrow_field_levels(
-            &self.metadata.schema_descr_ptr(),
+            &self.row_group_metadata().schema_descr_ptr(),
             self.mask.clone(),
             None,
         )?;
@@ -163,7 +183,7 @@ impl InMemoryRowGroup {
         _selection: Option<&RowSelection>,
     ) -> Result<()> {
         let mut vs = std::mem::take(&mut self.column_chunks);
-        for (leaf_idx, meta) in self.metadata.columns().iter().enumerate() {
+        for (leaf_idx, meta) in 
self.row_group_metadata().columns().iter().enumerate() {
             if self.mask.leaf_included(leaf_idx) {
                 let (start, len) = meta.byte_range();
                 let data = reader.get_bytes(start..(start + len)).await?;
diff --git a/parquet/src/arrow/array_reader/builder.rs 
b/parquet/src/arrow/array_reader/builder.rs
index 6107ba4f25..82c8e77f63 100644
--- a/parquet/src/arrow/array_reader/builder.rs
+++ b/parquet/src/arrow/array_reader/builder.rs
@@ -26,16 +26,18 @@ use 
crate::arrow::array_reader::cached_array_reader::CachedArrayReader;
 use crate::arrow::array_reader::empty_array::make_empty_array_reader;
 use 
crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader;
 use crate::arrow::array_reader::row_group_cache::RowGroupCache;
+use crate::arrow::array_reader::row_number::RowNumberReader;
 use crate::arrow::array_reader::{
     ArrayReader, FixedSizeListArrayReader, ListArrayReader, MapArrayReader, 
NullArrayReader,
     PrimitiveArrayReader, RowGroups, StructArrayReader, 
make_byte_array_dictionary_reader,
     make_byte_array_reader,
 };
 use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
-use crate::arrow::schema::{ParquetField, ParquetFieldType};
+use crate::arrow::schema::{ParquetField, ParquetFieldType, VirtualColumnType};
 use crate::basic::Type as PhysicalType;
 use crate::data_type::{BoolType, DoubleType, FloatType, Int32Type, Int64Type, 
Int96Type};
 use crate::errors::{ParquetError, Result};
+use crate::file::metadata::ParquetMetaData;
 use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
 
 /// Builder for [`CacheOptions`]
@@ -89,6 +91,8 @@ pub struct ArrayReaderBuilder<'a> {
     row_groups: &'a dyn RowGroups,
     /// Optional cache options for the array reader
     cache_options: Option<&'a CacheOptions<'a>>,
+    /// Parquet metadata for computing virtual column values
+    parquet_metadata: Option<&'a ParquetMetaData>,
     /// metrics
     metrics: &'a ArrowReaderMetrics,
 }
@@ -98,6 +102,7 @@ impl<'a> ArrayReaderBuilder<'a> {
         Self {
             row_groups,
             cache_options: None,
+            parquet_metadata: None,
             metrics,
         }
     }
@@ -108,6 +113,12 @@ impl<'a> ArrayReaderBuilder<'a> {
         self
     }
 
+    /// Add parquet metadata to the builder for computing virtual column values
+    pub fn with_parquet_metadata(mut self, parquet_metadata: &'a 
ParquetMetaData) -> Self {
+        self.parquet_metadata = Some(parquet_metadata);
+        self
+    }
+
     /// Create [`ArrayReader`] from parquet schema, projection mask, and 
parquet file reader.
     pub fn build_array_reader(
         &self,
@@ -153,6 +164,13 @@ impl<'a> ArrayReaderBuilder<'a> {
                     Ok(Some(reader))
                 }
             }
+            ParquetFieldType::Virtual(virtual_type) => {
+                // Virtual columns don't have data in the parquet file
+                // They need to be built by specialized readers
+                match virtual_type {
+                    VirtualColumnType::RowNumber => 
Ok(Some(self.build_row_number_reader()?)),
+                }
+            }
             ParquetFieldType::Group { .. } => match &field.arrow_type {
                 DataType::Map(_, _) => self.build_map_reader(field, mask),
                 DataType::Struct(_) => self.build_struct_reader(field, mask),
@@ -164,6 +182,18 @@ impl<'a> ArrayReaderBuilder<'a> {
         }
     }
 
+    fn build_row_number_reader(&self) -> Result<Box<dyn ArrayReader>> {
+        let parquet_metadata = self.parquet_metadata.ok_or_else(|| {
+            ParquetError::General(
+                "ParquetMetaData is required to read virtual row number 
columns.".to_string(),
+            )
+        })?;
+        Ok(Box::new(RowNumberReader::try_new(
+            parquet_metadata,
+            self.row_groups.row_groups(),
+        )?))
+    }
+
     /// Build array reader for map type.
     fn build_map_reader(
         &self,
@@ -439,6 +469,7 @@ impl<'a> ArrayReaderBuilder<'a> {
 mod tests {
     use super::*;
     use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
+    use crate::arrow::schema::virtual_type::RowNumber;
     use crate::file::reader::{FileReader, SerializedFileReader};
     use crate::util::test_common::file_util::get_test_file;
     use arrow::datatypes::Field;
@@ -455,6 +486,7 @@ mod tests {
             file_metadata.schema_descr(),
             ProjectionMask::all(),
             file_metadata.key_value_metadata(),
+            &[],
         )
         .unwrap();
 
@@ -472,4 +504,41 @@ mod tests {
 
         assert_eq!(array_reader.get_data_type(), &arrow_type);
     }
+
+    #[test]
+    fn test_create_array_reader_with_row_numbers() {
+        let file = get_test_file("nulls.snappy.parquet");
+        let file_reader: Arc<dyn FileReader> = 
Arc::new(SerializedFileReader::new(file).unwrap());
+
+        let file_metadata = file_reader.metadata().file_metadata();
+        let mask = ProjectionMask::leaves(file_metadata.schema_descr(), [0]);
+        let row_number_field = Arc::new(
+            Field::new("row_number", DataType::Int64, 
false).with_extension_type(RowNumber),
+        );
+        let (_, fields) = parquet_to_arrow_schema_and_fields(
+            file_metadata.schema_descr(),
+            ProjectionMask::all(),
+            file_metadata.key_value_metadata(),
+            std::slice::from_ref(&row_number_field),
+        )
+        .unwrap();
+
+        let metrics = ArrowReaderMetrics::disabled();
+        let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
+            .with_parquet_metadata(file_reader.metadata())
+            .build_array_reader(fields.as_ref(), &mask)
+            .unwrap();
+
+        // Create arrow types
+        let arrow_type = DataType::Struct(Fields::from(vec![
+            Field::new(
+                "b_struct",
+                DataType::Struct(vec![Field::new("b_c_int", DataType::Int32, 
true)].into()),
+                true,
+            ),
+            (*row_number_field).clone(),
+        ]));
+
+        assert_eq!(array_reader.get_data_type(), &arrow_type);
+    }
 }
diff --git a/parquet/src/arrow/array_reader/list_array.rs 
b/parquet/src/arrow/array_reader/list_array.rs
index 487c2bdd56..ff1b414c27 100644
--- a/parquet/src/arrow/array_reader/list_array.rs
+++ b/parquet/src/arrow/array_reader/list_array.rs
@@ -561,6 +561,7 @@ mod tests {
             schema,
             ProjectionMask::all(),
             file_metadata.key_value_metadata(),
+            &[],
         )
         .unwrap();
 
diff --git a/parquet/src/arrow/array_reader/mod.rs 
b/parquet/src/arrow/array_reader/mod.rs
index b3595e58d6..54be89f230 100644
--- a/parquet/src/arrow/array_reader/mod.rs
+++ b/parquet/src/arrow/array_reader/mod.rs
@@ -27,6 +27,7 @@ use crate::arrow::record_reader::GenericRecordReader;
 use crate::arrow::record_reader::buffer::ValuesBuffer;
 use crate::column::page::PageIterator;
 use crate::column::reader::decoder::ColumnValueDecoder;
+use crate::file::metadata::ParquetMetaData;
 use crate::file::reader::{FilePageIterator, FileReader};
 
 mod builder;
@@ -42,12 +43,14 @@ mod map_array;
 mod null_array;
 mod primitive_array;
 mod row_group_cache;
+mod row_number;
 mod struct_array;
 
 #[cfg(test)]
 mod test_util;
 
 // Note that this crate is public under the `experimental` feature flag.
+use crate::file::metadata::RowGroupMetaData;
 pub use builder::{ArrayReaderBuilder, CacheOptions, CacheOptionsBuilder};
 pub use byte_array::make_byte_array_reader;
 pub use byte_array_dictionary::make_byte_array_dictionary_reader;
@@ -139,17 +142,35 @@ pub trait RowGroups {
     /// Returns a [`PageIterator`] for all pages in the specified column chunk
     /// across all row groups in this collection.
     fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>>;
+
+    /// Returns an iterator over the row groups in this collection
+    ///
+    /// Note this may not include all row groups in [`Self::metadata`].
+    fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_>;
+
+    /// Returns the parquet metadata
+    fn metadata(&self) -> &ParquetMetaData;
 }
 
 impl RowGroups for Arc<dyn FileReader> {
     fn num_rows(&self) -> usize {
-        self.metadata().file_metadata().num_rows() as usize
+        FileReader::metadata(self.as_ref())
+            .file_metadata()
+            .num_rows() as usize
     }
 
     fn column_chunks(&self, column_index: usize) -> Result<Box<dyn 
PageIterator>> {
         let iterator = FilePageIterator::new(column_index, Arc::clone(self))?;
         Ok(Box::new(iterator))
     }
+
+    fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
+        Box::new(FileReader::metadata(self.as_ref()).row_groups().iter())
+    }
+
+    fn metadata(&self) -> &ParquetMetaData {
+        FileReader::metadata(self.as_ref())
+    }
 }
 
 /// Uses `record_reader` to read up to `batch_size` records from `pages`
diff --git a/parquet/src/arrow/array_reader/row_number.rs 
b/parquet/src/arrow/array_reader/row_number.rs
new file mode 100644
index 0000000000..e8116b2936
--- /dev/null
+++ b/parquet/src/arrow/array_reader/row_number.rs
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::array_reader::ArrayReader;
+use crate::errors::{ParquetError, Result};
+use crate::file::metadata::{ParquetMetaData, RowGroupMetaData};
+use arrow_array::{ArrayRef, Int64Array};
+use arrow_schema::DataType;
+use std::any::Any;
+use std::collections::HashSet;
+use std::sync::Arc;
+
+pub(crate) struct RowNumberReader {
+    buffered_row_numbers: Vec<i64>,
+    remaining_row_numbers: 
std::iter::Flatten<std::vec::IntoIter<std::ops::Range<i64>>>,
+}
+
+impl RowNumberReader {
+    pub(crate) fn try_new<'a>(
+        parquet_metadata: &'a ParquetMetaData,
+        row_groups: impl Iterator<Item = &'a RowGroupMetaData>,
+    ) -> Result<Self> {
+        // Collect ordinals from the selected row groups
+        let selected_ordinals: HashSet<i16> = row_groups
+            .map(|rg| {
+                rg.ordinal().ok_or_else(|| {
+                    ParquetError::General(
+                        "Row group missing ordinal field, required to compute 
row numbers"
+                            .to_string(),
+                    )
+                })
+            })
+            .collect::<Result<_>>()?;
+
+        // Iterate through all row groups once, computing first_row_index and 
creating ranges
+        // This is O(M) where M is total row groups, much better than O(N * O) 
where N is selected
+        let mut first_row_index: i64 = 0;
+        let mut ranges = Vec::new();
+
+        for rg in parquet_metadata.row_groups() {
+            if let Some(ordinal) = rg.ordinal() {
+                if selected_ordinals.contains(&ordinal) {
+                    ranges.push((ordinal, first_row_index..first_row_index + 
rg.num_rows()));
+                }
+            }
+            first_row_index += rg.num_rows();
+        }
+
+        // Sort ranges by ordinal to maintain original row group order
+        ranges.sort_by_key(|(ordinal, _)| *ordinal);
+        let ranges: Vec<_> = ranges.into_iter().map(|(_, range)| 
range).collect();
+
+        Ok(Self {
+            buffered_row_numbers: Vec::new(),
+            remaining_row_numbers: ranges.into_iter().flatten(),
+        })
+    }
+}
+
+impl ArrayReader for RowNumberReader {
+    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
+        let starting_len = self.buffered_row_numbers.len();
+        self.buffered_row_numbers
+            .extend((&mut self.remaining_row_numbers).take(batch_size));
+        Ok(self.buffered_row_numbers.len() - starting_len)
+    }
+
+    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+        // TODO: Use advance_by when it stabilizes to improve performance
+        Ok((&mut self.remaining_row_numbers).take(num_records).count())
+    }
+
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn get_data_type(&self) -> &DataType {
+        &DataType::Int64
+    }
+
+    fn consume_batch(&mut self) -> Result<ArrayRef> {
+        Ok(Arc::new(Int64Array::from_iter(
+            self.buffered_row_numbers.drain(..),
+        )))
+    }
+
+    fn get_def_levels(&self) -> Option<&[i16]> {
+        None
+    }
+
+    fn get_rep_levels(&self) -> Option<&[i16]> {
+        None
+    }
+}
diff --git a/parquet/src/arrow/arrow_reader/mod.rs 
b/parquet/src/arrow/arrow_reader/mod.rs
index 94a3cf9e34..16e01a6ebe 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -19,7 +19,7 @@
 
 use arrow_array::cast::AsArray;
 use arrow_array::{Array, RecordBatch, RecordBatchReader};
-use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
+use arrow_schema::{ArrowError, DataType as ArrowType, FieldRef, Schema, 
SchemaRef};
 use arrow_select::filter::filter_record_batch;
 pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
 pub use selection::{RowSelection, RowSelectionCursor, RowSelectionPolicy, 
RowSelector};
@@ -28,8 +28,10 @@ use std::sync::Arc;
 
 pub use crate::arrow::array_reader::RowGroups;
 use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
-use crate::arrow::schema::{ParquetField, parquet_to_arrow_schema_and_fields};
-use crate::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels};
+use crate::arrow::schema::{
+    ParquetField, parquet_to_arrow_schema_and_fields, 
virtual_type::is_virtual_column,
+};
+use crate::arrow::{FieldLevels, ProjectionMask, 
parquet_to_arrow_field_levels_with_virtual};
 use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, 
BloomFilterHash};
 use crate::bloom_filter::{
     SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
@@ -40,6 +42,7 @@ use crate::encryption::decrypt::FileDecryptionProperties;
 use crate::errors::{ParquetError, Result};
 use crate::file::metadata::{
     PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions, 
ParquetMetaDataReader,
+    RowGroupMetaData,
 };
 use crate::file::reader::{ChunkReader, SerializedPageReader};
 use crate::schema::types::SchemaDescriptor;
@@ -426,6 +429,8 @@ pub struct ArrowReaderOptions {
     /// If encryption is enabled, the file decryption properties can be 
provided
     #[cfg(feature = "encryption")]
     pub(crate) file_decryption_properties: 
Option<Arc<FileDecryptionProperties>>,
+
+    virtual_columns: Vec<FieldRef>,
 }
 
 impl ArrowReaderOptions {
@@ -566,6 +571,73 @@ impl ArrowReaderOptions {
         }
     }
 
+    /// Include virtual columns in the output.
+    ///
+    /// Virtual columns are columns that are not part of the Parquet schema, 
but are added to the output by the reader such as row numbers.
+    ///
+    /// # Example
+    /// ```
+    /// # use std::sync::Arc;
+    /// # use arrow_array::{ArrayRef, Int64Array, RecordBatch};
+    /// # use arrow_schema::{DataType, Field, Schema};
+    /// # use parquet::arrow::{ArrowWriter, RowNumber};
+    /// # use parquet::arrow::arrow_reader::{ArrowReaderOptions, 
ParquetRecordBatchReaderBuilder};
+    /// # use tempfile::tempfile;
+    /// #
+    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
+    /// // Create a simple record batch with some data
+    /// let values = Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef;
+    /// let batch = RecordBatch::try_from_iter(vec![("value", values)])?;
+    ///
+    /// // Write the batch to a temporary parquet file
+    /// let file = tempfile()?;
+    /// let mut writer = ArrowWriter::try_new(
+    ///     file.try_clone()?,
+    ///     batch.schema(),
+    ///     None
+    /// )?;
+    /// writer.write(&batch)?;
+    /// writer.close()?;
+    ///
+    /// // Create a virtual column for row numbers
+    /// let row_number_field = Arc::new(Field::new("row_number", 
DataType::Int64, false)
+    ///     .with_extension_type(RowNumber));
+    ///
+    /// // Configure options with virtual columns
+    /// let options = ArrowReaderOptions::new()
+    ///     .with_virtual_columns(vec![row_number_field]);
+    ///
+    /// // Create a reader with the options
+    /// let mut reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
+    ///     file,
+    ///     options
+    /// )?
+    /// .build()?;
+    ///
+    /// // Read the batch - it will include both the original column and the 
virtual row_number column
+    /// let result_batch = reader.next().unwrap()?;
+    /// assert_eq!(result_batch.num_columns(), 2); // "value" + "row_number"
+    /// assert_eq!(result_batch.num_rows(), 3);
+    /// #
+    /// # Ok(())
+    /// # }
+    /// ```
+    pub fn with_virtual_columns(self, virtual_columns: Vec<FieldRef>) -> Self {
+        // Validate that all fields are virtual columns
+        for field in &virtual_columns {
+            if !is_virtual_column(field) {
+                panic!(
+                    "Field '{}' is not a virtual column. Virtual columns must 
have extension type names starting with 'arrow.virtual.'",
+                    field.name()
+                );
+            }
+        }
+        Self {
+            virtual_columns,
+            ..self
+        }
+    }
+
     /// Retrieve the currently set page index behavior.
     ///
     /// This can be set via [`with_page_index`][Self::with_page_index].
@@ -645,7 +717,11 @@ impl ArrowReaderMetadata {
     /// of the settings in `options`. See [`Self::load`] to load metadata 
including the page index if needed.
     pub fn try_new(metadata: Arc<ParquetMetaData>, options: 
ArrowReaderOptions) -> Result<Self> {
         match options.supplied_schema {
-            Some(supplied_schema) => Self::with_supplied_schema(metadata, 
supplied_schema.clone()),
+            Some(supplied_schema) => Self::with_supplied_schema(
+                metadata,
+                supplied_schema.clone(),
+                &options.virtual_columns,
+            ),
             None => {
                 let kv_metadata = match options.skip_arrow_metadata {
                     true => None,
@@ -656,6 +732,7 @@ impl ArrowReaderMetadata {
                     metadata.file_metadata().schema_descr(),
                     ProjectionMask::all(),
                     kv_metadata,
+                    &options.virtual_columns,
                 )?;
 
                 Ok(Self {
@@ -670,16 +747,18 @@ impl ArrowReaderMetadata {
     fn with_supplied_schema(
         metadata: Arc<ParquetMetaData>,
         supplied_schema: SchemaRef,
+        virtual_columns: &[FieldRef],
     ) -> Result<Self> {
         let parquet_schema = metadata.file_metadata().schema_descr();
-        let field_levels = parquet_to_arrow_field_levels(
+        let field_levels = parquet_to_arrow_field_levels_with_virtual(
             parquet_schema,
             ProjectionMask::all(),
             Some(supplied_schema.fields()),
+            virtual_columns,
         )?;
         let fields = field_levels.fields;
         let inferred_len = fields.len();
-        let supplied_len = supplied_schema.fields().len();
+        let supplied_len = supplied_schema.fields().len() + 
virtual_columns.len();
         // Ensure the supplied schema has the same number of columns as the 
parquet schema.
         // parquet_to_arrow_field_levels is expected to throw an error if the 
schemas have
         // different lengths, but we check here to be safe.
@@ -962,6 +1041,7 @@ impl<T: ChunkReader + 'static> 
ParquetRecordBatchReaderBuilder<T> {
                 cache_projection.intersect(&projection);
 
                 let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
+                    .with_parquet_metadata(&reader.metadata)
                     .build_array_reader(fields.as_deref(), 
predicate.projection())?;
 
                 plan_builder = plan_builder.with_predicate(array_reader, 
predicate.as_mut())?;
@@ -969,6 +1049,7 @@ impl<T: ChunkReader + 'static> 
ParquetRecordBatchReaderBuilder<T> {
         }
 
         let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
+            .with_parquet_metadata(&reader.metadata)
             .build_array_reader(fields.as_deref(), &projection)?;
 
         let read_plan = plan_builder
@@ -1007,6 +1088,18 @@ impl<T: ChunkReader + 'static> RowGroups for 
ReaderRowGroups<T> {
             row_groups: self.row_groups.clone().into_iter(),
         }))
     }
+
+    fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
+        Box::new(
+            self.row_groups
+                .iter()
+                .map(move |i| self.metadata.row_group(*i)),
+        )
+    }
+
+    fn metadata(&self) -> &ParquetMetaData {
+        self.metadata.as_ref()
+    }
 }
 
 struct ReaderPageIterator<T: ChunkReader> {
@@ -1263,6 +1356,7 @@ impl ParquetRecordBatchReader {
         // note metrics are not supported in this API
         let metrics = ArrowReaderMetrics::disabled();
         let array_reader = ArrayReaderBuilder::new(row_groups, &metrics)
+            .with_parquet_metadata(row_groups.metadata())
             .build_array_reader(levels.levels.as_ref(), 
&ProjectionMask::all())?;
 
         let read_plan = ReadPlanBuilder::new(batch_size)
@@ -1299,7 +1393,7 @@ impl ParquetRecordBatchReader {
 }
 
 #[cfg(test)]
-mod tests {
+pub(crate) mod tests {
     use std::cmp::min;
     use std::collections::{HashMap, VecDeque};
     use std::fmt::Formatter;
@@ -1308,11 +1402,16 @@ mod tests {
     use std::path::PathBuf;
     use std::sync::Arc;
 
+    use rand::rngs::StdRng;
+    use rand::{Rng, RngCore, SeedableRng, random, rng};
+    use tempfile::tempfile;
+
     use crate::arrow::arrow_reader::{
-        ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions, 
ParquetRecordBatchReader,
-        ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, 
RowSelectionPolicy, RowSelector,
+        ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderMetadata, 
ArrowReaderOptions,
+        ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder, RowFilter, 
RowSelection,
+        RowSelectionPolicy, RowSelector,
     };
-    use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
+    use crate::arrow::schema::{add_encoded_arrow_schema_to_metadata, 
virtual_type::RowNumber};
     use crate::arrow::{ArrowWriter, ProjectionMask};
     use crate::basic::{ConvertedType, Encoding, LogicalType, Repetition, Type 
as PhysicalType};
     use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
@@ -1346,8 +1445,6 @@ mod tests {
     use bytes::Bytes;
     use half::f16;
     use num_traits::PrimInt;
-    use rand::{Rng, RngCore, rng};
-    use tempfile::tempfile;
 
     #[test]
     fn test_arrow_reader_all_columns() {
@@ -3100,7 +3197,7 @@ mod tests {
                 assert_eq!(end - total_read, batch.num_rows());
 
                 let a = converter(&expected_data[total_read..end]);
-                let b = Arc::clone(batch.column(0));
+                let b = batch.column(0);
 
                 assert_eq!(a.data_type(), b.data_type());
                 assert_eq!(a.to_data(), b.to_data());
@@ -5360,4 +5457,258 @@ mod tests {
         assert_eq!(out.num_rows(), 3);
         assert_eq!(out.num_columns(), 2);
     }
+
+    #[test]
+    fn test_read_row_numbers() {
+        let file = write_parquet_from_iter(vec![(
+            "value",
+            Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
+        )]);
+        let supplied_fields = Fields::from(vec![Field::new("value", 
ArrowDataType::Int64, false)]);
+
+        let row_number_field = Arc::new(
+            Field::new("row_number", ArrowDataType::Int64, 
false).with_extension_type(RowNumber),
+        );
+
+        let options = 
ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
+        let options = 
options.with_virtual_columns(vec![row_number_field.clone()]);
+        let mut arrow_reader = 
ParquetRecordBatchReaderBuilder::try_new_with_options(
+            file.try_clone().unwrap(),
+            options,
+        )
+        .expect("reader builder with schema")
+        .build()
+        .expect("reader with schema");
+
+        let batch = arrow_reader.next().unwrap().unwrap();
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("value", ArrowDataType::Int64, false),
+            (*row_number_field).clone(),
+        ]));
+
+        assert_eq!(batch.schema(), schema);
+        assert_eq!(batch.num_columns(), 2);
+        assert_eq!(batch.num_rows(), 3);
+        assert_eq!(
+            batch
+                .column(0)
+                .as_primitive::<types::Int64Type>()
+                .iter()
+                .collect::<Vec<_>>(),
+            vec![Some(1), Some(2), Some(3)]
+        );
+        assert_eq!(
+            batch
+                .column(1)
+                .as_primitive::<types::Int64Type>()
+                .iter()
+                .collect::<Vec<_>>(),
+            vec![Some(0), Some(1), Some(2)]
+        );
+    }
+
+    #[test]
+    fn test_read_only_row_numbers() {
+        let file = write_parquet_from_iter(vec![(
+            "value",
+            Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
+        )]);
+        let row_number_field = Arc::new(
+            Field::new("row_number", ArrowDataType::Int64, 
false).with_extension_type(RowNumber),
+        );
+        let options =
+            
ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field.clone()]);
+        let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
+        let num_columns = metadata
+            .metadata
+            .file_metadata()
+            .schema_descr()
+            .num_columns();
+
+        let mut arrow_reader = 
ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
+            .with_projection(ProjectionMask::none(num_columns))
+            .build()
+            .expect("reader with schema");
+
+        let batch = arrow_reader.next().unwrap().unwrap();
+        let schema = Arc::new(Schema::new(vec![row_number_field]));
+
+        assert_eq!(batch.schema(), schema);
+        assert_eq!(batch.num_columns(), 1);
+        assert_eq!(batch.num_rows(), 3);
+        assert_eq!(
+            batch
+                .column(0)
+                .as_primitive::<types::Int64Type>()
+                .iter()
+                .collect::<Vec<_>>(),
+            vec![Some(0), Some(1), Some(2)]
+        );
+    }
+
+    #[test]
+    #[should_panic(expected = "is not a virtual column")]
+    fn test_with_virtual_columns_rejects_non_virtual_fields() {
+        // Try to pass a regular field (not a virtual column) to 
with_virtual_columns
+        let regular_field = Arc::new(Field::new("regular_column", 
ArrowDataType::Int64, false));
+        let _options = 
ArrowReaderOptions::new().with_virtual_columns(vec![regular_field]);
+    }
+
+    #[test]
+    fn test_row_numbers_with_multiple_row_groups() {
+        test_row_numbers_with_multiple_row_groups_helper(
+            false,
+            |path, selection, _row_filter, batch_size| {
+                let file = File::open(path).unwrap();
+                let row_number_field = Arc::new(
+                    Field::new("row_number", ArrowDataType::Int64, false)
+                        .with_extension_type(RowNumber),
+                );
+                let options =
+                    
ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field]);
+                let reader = 
ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
+                    .unwrap()
+                    .with_row_selection(selection)
+                    .with_batch_size(batch_size)
+                    .build()
+                    .expect("Could not create reader");
+                reader
+                    .collect::<Result<Vec<_>, _>>()
+                    .expect("Could not read")
+            },
+        );
+    }
+
+    #[test]
+    fn test_row_numbers_with_multiple_row_groups_and_filter() {
+        test_row_numbers_with_multiple_row_groups_helper(
+            true,
+            |path, selection, row_filter, batch_size| {
+                let file = File::open(path).unwrap();
+                let row_number_field = Arc::new(
+                    Field::new("row_number", ArrowDataType::Int64, false)
+                        .with_extension_type(RowNumber),
+                );
+                let options =
+                    
ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field]);
+                let reader = 
ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
+                    .unwrap()
+                    .with_row_selection(selection)
+                    .with_batch_size(batch_size)
+                    .with_row_filter(row_filter.expect("No filter"))
+                    .build()
+                    .expect("Could not create reader");
+                reader
+                    .collect::<Result<Vec<_>, _>>()
+                    .expect("Could not read")
+            },
+        );
+    }
+
+    pub(crate) fn test_row_numbers_with_multiple_row_groups_helper<F>(
+        use_filter: bool,
+        test_case: F,
+    ) where
+        F: FnOnce(PathBuf, RowSelection, Option<RowFilter>, usize) -> 
Vec<RecordBatch>,
+    {
+        let seed: u64 = random();
+        println!("test_row_numbers_with_multiple_row_groups seed: {}", seed);
+        let mut rng = StdRng::seed_from_u64(seed);
+
+        use tempfile::TempDir;
+        let tempdir = TempDir::new().expect("Could not create temp dir");
+
+        let (bytes, metadata) = generate_file_with_row_numbers(&mut rng);
+
+        let path = tempdir.path().join("test.parquet");
+        std::fs::write(&path, bytes).expect("Could not write file");
+
+        let mut case = vec![];
+        let mut remaining = metadata.file_metadata().num_rows();
+        while remaining > 0 {
+            let row_count = rng.random_range(1..=remaining);
+            remaining -= row_count;
+            case.push(RowSelector {
+                row_count: row_count as usize,
+                skip: rng.random_bool(0.5),
+            });
+        }
+
+        let filter = use_filter.then(|| {
+            let filter = (0..metadata.file_metadata().num_rows())
+                .map(|_| rng.random_bool(0.99))
+                .collect::<Vec<_>>();
+            let mut filter_offset = 0;
+            RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
+                ProjectionMask::all(),
+                move |b| {
+                    let array = BooleanArray::from_iter(
+                        filter
+                            .iter()
+                            .skip(filter_offset)
+                            .take(b.num_rows())
+                            .map(|x| Some(*x)),
+                    );
+                    filter_offset += b.num_rows();
+                    Ok(array)
+                },
+            ))])
+        });
+
+        let selection = RowSelection::from(case);
+        let batches = test_case(path, selection.clone(), filter, 
rng.random_range(1..4096));
+
+        if selection.skipped_row_count() == 
metadata.file_metadata().num_rows() as usize {
+            assert!(batches.into_iter().all(|batch| batch.num_rows() == 0));
+            return;
+        }
+        let actual = concat_batches(batches.first().expect("No 
batches").schema_ref(), &batches)
+            .expect("Failed to concatenate");
+        // assert_eq!(selection.row_count(), actual.num_rows());
+        let values = actual
+            .column(0)
+            .as_primitive::<types::Int64Type>()
+            .iter()
+            .collect::<Vec<_>>();
+        let row_numbers = actual
+            .column(1)
+            .as_primitive::<types::Int64Type>()
+            .iter()
+            .collect::<Vec<_>>();
+        assert_eq!(
+            row_numbers
+                .into_iter()
+                .map(|number| number.map(|number| number + 1))
+                .collect::<Vec<_>>(),
+            values
+        );
+    }
+
+    fn generate_file_with_row_numbers(rng: &mut impl Rng) -> (Bytes, 
ParquetMetaData) {
+        let schema = Arc::new(Schema::new(Fields::from(vec![Field::new(
+            "value",
+            ArrowDataType::Int64,
+            false,
+        )])));
+
+        let mut buf = Vec::with_capacity(1024);
+        let mut writer =
+            ArrowWriter::try_new(&mut buf, schema.clone(), None).expect("Could 
not create writer");
+
+        let mut values = 1..=rng.random_range(1..4096);
+        while !values.is_empty() {
+            let batch_values = values
+                .by_ref()
+                .take(rng.random_range(1..4096))
+                .collect::<Vec<_>>();
+            let array = Arc::new(Int64Array::from(batch_values)) as ArrayRef;
+            let batch =
+                RecordBatch::try_from_iter([("value", array)]).expect("Could 
not create batch");
+            writer.write(&batch).expect("Could not write batch");
+            writer.flush().expect("Could not flush");
+        }
+        let metadata = writer.close().expect("Could not close writer");
+
+        (Bytes::from(buf), metadata)
+    }
 }
diff --git a/parquet/src/arrow/async_reader/mod.rs 
b/parquet/src/arrow/async_reader/mod.rs
index 07edf4be2a..35acc5fbf1 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -498,9 +498,10 @@ impl<T: AsyncFileReader + Send + 'static> 
ParquetRecordBatchStreamBuilder<T> {
 
         // Ensure schema of ParquetRecordBatchStream respects projection, and 
does
         // not store metadata (same as for ParquetRecordBatchReader and 
emitted RecordBatches)
+        let projection_len = projection.mask.as_ref().map_or(usize::MAX, |m| 
m.len());
         let projected_fields = schema
             .fields
-            .filter_leaves(|idx, _| projection.leaf_included(idx));
+            .filter_leaves(|idx, _| idx < projection_len && 
projection.leaf_included(idx));
         let projected_schema = Arc::new(Schema::new(projected_fields));
 
         let decoder = ParquetPushDecoderBuilder {
@@ -767,10 +768,12 @@ where
 mod tests {
     use super::*;
     use crate::arrow::arrow_reader::RowSelectionPolicy;
+    use 
crate::arrow::arrow_reader::tests::test_row_numbers_with_multiple_row_groups_helper;
     use crate::arrow::arrow_reader::{
         ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowFilter, 
RowSelection, RowSelector,
     };
     use crate::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
+    use crate::arrow::schema::virtual_type::RowNumber;
     use crate::arrow::{ArrowWriter, ProjectionMask};
     use crate::file::metadata::ParquetMetaDataReader;
     use crate::file::properties::WriterProperties;
@@ -2183,4 +2186,65 @@ mod tests {
             92
         );
     }
+
+    #[test]
+    fn test_row_numbers_with_multiple_row_groups() {
+        test_row_numbers_with_multiple_row_groups_helper(
+            false,
+            |path, selection, _row_filter, batch_size| {
+                let runtime = tokio::runtime::Builder::new_current_thread()
+                    .enable_all()
+                    .build()
+                    .expect("Could not create runtime");
+                runtime.block_on(async move {
+                    let file = tokio::fs::File::open(path).await.unwrap();
+                    let row_number_field = Arc::new(
+                        Field::new("row_number", DataType::Int64, false)
+                            .with_extension_type(RowNumber),
+                    );
+                    let options =
+                        
ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field]);
+                    let reader = 
ParquetRecordBatchStreamBuilder::new_with_options(file, options)
+                        .await
+                        .unwrap()
+                        .with_row_selection(selection)
+                        .with_batch_size(batch_size)
+                        .build()
+                        .expect("Could not create reader");
+                    reader.try_collect::<Vec<_>>().await.unwrap()
+                })
+            },
+        );
+    }
+
+    #[test]
+    fn test_row_numbers_with_multiple_row_groups_and_filter() {
+        test_row_numbers_with_multiple_row_groups_helper(
+            true,
+            |path, selection, row_filter, batch_size| {
+                let runtime = tokio::runtime::Builder::new_current_thread()
+                    .enable_all()
+                    .build()
+                    .expect("Could not create runtime");
+                runtime.block_on(async move {
+                    let file = tokio::fs::File::open(path).await.unwrap();
+                    let row_number_field = Arc::new(
+                        Field::new("row_number", DataType::Int64, false)
+                            .with_extension_type(RowNumber),
+                    );
+                    let options =
+                        
ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field]);
+                    let reader = 
ParquetRecordBatchStreamBuilder::new_with_options(file, options)
+                        .await
+                        .unwrap()
+                        .with_row_selection(selection)
+                        .with_row_filter(row_filter.expect("No row filter"))
+                        .with_batch_size(batch_size)
+                        .build()
+                        .expect("Could not create reader");
+                    reader.try_collect::<Vec<_>>().await.unwrap()
+                })
+            },
+        );
+    }
 }
diff --git a/parquet/src/arrow/in_memory_row_group.rs 
b/parquet/src/arrow/in_memory_row_group.rs
index 7969f6f323..4baa8cf9de 100644
--- a/parquet/src/arrow/in_memory_row_group.rs
+++ b/parquet/src/arrow/in_memory_row_group.rs
@@ -20,7 +20,7 @@ use crate::arrow::array_reader::RowGroups;
 use crate::arrow::arrow_reader::RowSelection;
 use crate::column::page::{PageIterator, PageReader};
 use crate::errors::ParquetError;
-use crate::file::metadata::ParquetMetaData;
+use crate::file::metadata::{ParquetMetaData, RowGroupMetaData};
 use crate::file::page_index::offset_index::OffsetIndexMetaData;
 use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
 use bytes::{Buf, Bytes};
@@ -226,6 +226,14 @@ impl RowGroups for InMemoryRowGroup<'_> {
             }
         }
     }
+
+    fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
+        Box::new(std::iter::once(self.metadata.row_group(self.row_group_idx)))
+    }
+
+    fn metadata(&self) -> &ParquetMetaData {
+        self.metadata
+    }
 }
 
 /// An in-memory column chunk.
diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs
index 34aac8b08a..244364d8d1 100644
--- a/parquet/src/arrow/mod.rs
+++ b/parquet/src/arrow/mod.rs
@@ -209,7 +209,8 @@ use arrow_schema::{FieldRef, Schema};
 
 pub use self::schema::{
     ArrowSchemaConverter, FieldLevels, add_encoded_arrow_schema_to_metadata, 
encode_arrow_schema,
-    parquet_to_arrow_field_levels, parquet_to_arrow_schema, 
parquet_to_arrow_schema_by_columns,
+    parquet_to_arrow_field_levels, parquet_to_arrow_field_levels_with_virtual,
+    parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns, 
virtual_type::*,
 };
 
 /// Schema metadata key used to store serialized Arrow schema
@@ -264,7 +265,7 @@ pub struct ProjectionMask {
     /// A mask of `[true, false, true, false]` will result in a schema 2
     /// elements long:
     /// * `fields[0]`: `a`
-    /// * `fields[1]`: `c`    
+    /// * `fields[1]`: `c`
     ///
     /// A mask of `None` will result in a schema 4 elements long:
     /// * `fields[0]`: `a`
diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs 
b/parquet/src/arrow/push_decoder/reader_builder/mod.rs
index 312b8af845..f02d6aa5dc 100644
--- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs
+++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs
@@ -434,6 +434,7 @@ impl RowGroupReaderBuilder {
 
                 let array_reader = ArrayReaderBuilder::new(&row_group, 
&self.metrics)
                     .with_cache_options(Some(&cache_options))
+                    .with_parquet_metadata(&self.metadata)
                     .build_array_reader(self.fields.as_deref(), 
predicate.projection())?;
 
                 plan_builder =
@@ -590,7 +591,8 @@ impl RowGroupReaderBuilder {
                 let plan = plan_builder.build();
 
                 // if we have any cached results, connect them up
-                let array_reader_builder = ArrayReaderBuilder::new(&row_group, 
&self.metrics);
+                let array_reader_builder = ArrayReaderBuilder::new(&row_group, 
&self.metrics)
+                    .with_parquet_metadata(&self.metadata);
                 let array_reader = if let Some(cache_info) = 
cache_info.as_ref() {
                     let cache_options = cache_info.builder().consumer();
                     array_reader_builder
diff --git a/parquet/src/arrow/schema/complex.rs 
b/parquet/src/arrow/schema/complex.rs
index 9622f6270d..8b85cac479 100644
--- a/parquet/src/arrow/schema/complex.rs
+++ b/parquet/src/arrow/schema/complex.rs
@@ -20,12 +20,13 @@ use std::sync::Arc;
 
 use crate::arrow::schema::extension::try_add_extension_type;
 use crate::arrow::schema::primitive::convert_primitive;
+use crate::arrow::schema::virtual_type::RowNumber;
 use crate::arrow::{PARQUET_FIELD_ID_META_KEY, ProjectionMask};
 use crate::basic::{ConvertedType, Repetition};
 use crate::errors::ParquetError;
 use crate::errors::Result;
 use crate::schema::types::{SchemaDescriptor, Type, TypePtr};
-use arrow_schema::{DataType, Field, Fields, SchemaBuilder};
+use arrow_schema::{DataType, Field, Fields, SchemaBuilder, 
extension::ExtensionType};
 
 fn get_repetition(t: &Type) -> Repetition {
     let info = t.get_basic_info();
@@ -77,10 +78,18 @@ impl ParquetField {
         match &self.field_type {
             ParquetFieldType::Primitive { .. } => None,
             ParquetFieldType::Group { children } => Some(children),
+            ParquetFieldType::Virtual(_) => None,
         }
     }
 }
 
+/// Types of virtual columns that can be computed at read time
+#[derive(Debug, Clone, Copy, PartialEq)]
+pub enum VirtualColumnType {
+    /// Row number within the file
+    RowNumber,
+}
+
 #[derive(Debug, Clone)]
 pub enum ParquetFieldType {
     Primitive {
@@ -92,6 +101,9 @@ pub enum ParquetFieldType {
     Group {
         children: Vec<ParquetField>,
     },
+    /// Virtual column that doesn't exist in the parquet file
+    /// but is computed at read time (e.g., row_number)
+    Virtual(VirtualColumnType),
 }
 
 /// Encodes the context of the parent of the field currently under 
consideration
@@ -541,6 +553,55 @@ impl Visitor {
     }
 }
 
+/// Converts a virtual Arrow [`Field`] to a [`ParquetField`]
+///
+/// Virtual fields don't correspond to any data in the parquet file,
+/// but are computed at read time (e.g., row_number)
+///
+/// The levels are computed based on the parent context:
+/// - If nullable: def_level = parent_def_level + 1
+/// - If required: def_level = parent_def_level
+/// - rep_level = parent_rep_level (virtual fields are not repeated)
+pub(super) fn convert_virtual_field(
+    arrow_field: &Field,
+    parent_rep_level: i16,
+    parent_def_level: i16,
+) -> Result<ParquetField> {
+    let nullable = arrow_field.is_nullable();
+    let def_level = if nullable {
+        parent_def_level + 1
+    } else {
+        parent_def_level
+    };
+
+    // Determine the virtual column type based on the extension type name
+    let extension_name = arrow_field.extension_type_name().ok_or_else(|| {
+        ParquetError::ArrowError(format!(
+            "virtual column field '{}' must have an extension type",
+            arrow_field.name()
+        ))
+    })?;
+
+    let virtual_type = match extension_name {
+        RowNumber::NAME => VirtualColumnType::RowNumber,
+        _ => {
+            return Err(ParquetError::ArrowError(format!(
+                "unsupported virtual column type '{}' for field '{}'",
+                extension_name,
+                arrow_field.name()
+            )));
+        }
+    };
+
+    Ok(ParquetField {
+        rep_level: parent_rep_level,
+        def_level,
+        nullable,
+        arrow_type: arrow_field.data_type().clone(),
+        field_type: ParquetFieldType::Virtual(virtual_type),
+    })
+}
+
 /// Computes the Arrow [`Field`] for a child column
 ///
 /// The resulting Arrow [`Field`] will have the type dictated by the Parquet 
`field`, a name
diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs
index 1d8d2e1fd3..b4a9ba7b7f 100644
--- a/parquet/src/arrow/schema/mod.rs
+++ b/parquet/src/arrow/schema/mod.rs
@@ -23,7 +23,7 @@ use std::collections::HashMap;
 use std::sync::Arc;
 
 use arrow_ipc::writer;
-use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit};
+use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, TimeUnit};
 
 use crate::basic::{
     ConvertedType, LogicalType, Repetition, TimeUnit as ParquetTimeUnit, Type 
as PhysicalType,
@@ -35,6 +35,7 @@ use crate::schema::types::{ColumnDescriptor, 
SchemaDescriptor, Type};
 mod complex;
 mod extension;
 mod primitive;
+pub mod virtual_type;
 
 use super::PARQUET_FIELD_ID_META_KEY;
 use crate::arrow::ProjectionMask;
@@ -42,7 +43,7 @@ use crate::arrow::schema::extension::{
     has_extension_type, logical_type_for_fixed_size_binary, 
logical_type_for_string,
     logical_type_for_struct, try_add_extension_type,
 };
-pub(crate) use complex::{ParquetField, ParquetFieldType};
+pub(crate) use complex::{ParquetField, ParquetFieldType, VirtualColumnType};
 
 /// Convert Parquet schema to Arrow schema including optional metadata
 ///
@@ -62,7 +63,7 @@ pub fn parquet_to_arrow_schema_by_columns(
     mask: ProjectionMask,
     key_value_metadata: Option<&Vec<KeyValue>>,
 ) -> Result<Schema> {
-    Ok(parquet_to_arrow_schema_and_fields(parquet_schema, mask, 
key_value_metadata)?.0)
+    Ok(parquet_to_arrow_schema_and_fields(parquet_schema, mask, 
key_value_metadata, &[])?.0)
 }
 
 /// Determines the Arrow Schema from a Parquet schema
@@ -74,6 +75,7 @@ pub(crate) fn parquet_to_arrow_schema_and_fields(
     parquet_schema: &SchemaDescriptor,
     mask: ProjectionMask,
     key_value_metadata: Option<&Vec<KeyValue>>,
+    virtual_columns: &[FieldRef],
 ) -> Result<(Schema, Option<ParquetField>)> {
     let mut metadata = 
parse_key_value_metadata(key_value_metadata).unwrap_or_default();
     let maybe_schema = metadata
@@ -89,7 +91,8 @@ pub(crate) fn parquet_to_arrow_schema_and_fields(
     }
 
     let hint = maybe_schema.as_ref().map(|s| s.fields());
-    let field_levels = parquet_to_arrow_field_levels(parquet_schema, mask, 
hint)?;
+    let field_levels =
+        parquet_to_arrow_field_levels_with_virtual(parquet_schema, mask, hint, 
virtual_columns)?;
     let schema = Schema::new_with_metadata(field_levels.fields, metadata);
     Ok((schema, field_levels.levels))
 }
@@ -132,18 +135,123 @@ pub fn parquet_to_arrow_field_levels(
     mask: ProjectionMask,
     hint: Option<&Fields>,
 ) -> Result<FieldLevels> {
-    match complex::convert_schema(schema, mask, hint)? {
-        Some(field) => match &field.arrow_type {
-            DataType::Struct(fields) => Ok(FieldLevels {
-                fields: fields.clone(),
-                levels: Some(field),
-            }),
-            _ => unreachable!(),
-        },
-        None => Ok(FieldLevels {
-            fields: Fields::empty(),
-            levels: None,
+    parquet_to_arrow_field_levels_with_virtual(schema, mask, hint, &[])
+}
+
+/// Convert a parquet [`SchemaDescriptor`] to [`FieldLevels`] with support for 
virtual columns
+///
+/// Columns not included within [`ProjectionMask`] will be ignored.
+///
+/// The optional `hint` parameter is the desired Arrow schema. See the
+/// [`arrow`] module documentation for more information.
+///
+/// [`arrow`]: crate::arrow
+///
+/// # Arguments
+/// * `schema` - The Parquet schema descriptor
+/// * `mask` - Projection mask to select which columns to include
+/// * `hint` - Optional hint for Arrow field types to use instead of defaults
+/// * `virtual_columns` - Virtual columns to append to the schema (e.g., row 
numbers)
+///
+/// # Notes:
+/// Where a field type in `hint` is compatible with the corresponding parquet 
type in `schema`, it
+/// will be used, otherwise the default arrow type for the given parquet 
column type will be used.
+///
+/// Virtual columns are columns that don't exist in the Parquet file but are 
generated during reading.
+/// They must have extension type names starting with "arrow.virtual.".
+///
+/// This is to accommodate arrow types that cannot be round-tripped through 
parquet natively.
+/// Depending on the parquet writer, this can lead to a mismatch between a 
file's parquet schema
+/// and its embedded arrow schema. The parquet `schema` must be treated as 
authoritative in such
+/// an event. See [#1663](https://github.com/apache/arrow-rs/issues/1663) for 
more information
+///
+/// Note: this is a low-level API, most users will want to make use of the 
higher-level
+/// [`parquet_to_arrow_schema`] for decoding metadata from a parquet file.
+pub fn parquet_to_arrow_field_levels_with_virtual(
+    schema: &SchemaDescriptor,
+    mask: ProjectionMask,
+    hint: Option<&Fields>,
+    virtual_columns: &[FieldRef],
+) -> Result<FieldLevels> {
+    // Validate that all fields are virtual columns
+    for field in virtual_columns {
+        if !virtual_type::is_virtual_column(field) {
+            return Err(ParquetError::General(format!(
+                "Field '{}' is not a virtual column. Virtual columns must have 
extension type names starting with 'arrow.virtual.'",
+                field.name()
+            )));
+        }
+    }
+
+    // Convert the regular schema first
+    let mut parquet_field = match complex::convert_schema(schema, mask, hint)? 
{
+        Some(field) => field,
+        None if virtual_columns.is_empty() => {
+            return Ok(FieldLevels {
+                fields: Fields::empty(),
+                levels: None,
+            });
+        }
+        None => {
+            // No regular fields, but we have virtual columns - create empty 
root struct
+            ParquetField {
+                rep_level: 0,
+                def_level: 0,
+                nullable: false,
+                arrow_type: DataType::Struct(Fields::empty()),
+                field_type: ParquetFieldType::Group {
+                    children: Vec::new(),
+                },
+            }
+        }
+    };
+
+    // Append virtual columns if any
+    if !virtual_columns.is_empty() {
+        match &mut parquet_field.field_type {
+            ParquetFieldType::Group { children } => {
+                // Get the mutable fields from the struct type
+                let DataType::Struct(ref mut fields) = 
parquet_field.arrow_type else {
+                    unreachable!("Root field must be a struct");
+                };
+
+                // Convert to mutable Vec to append
+                let mut fields_vec: Vec<FieldRef> = 
fields.iter().cloned().collect();
+
+                // Append each virtual column
+                for virtual_column in virtual_columns {
+                    // Virtual columns can only be added at the root level
+                    assert_eq!(
+                        parquet_field.rep_level, 0,
+                        "Virtual columns can only be added at rep level 0"
+                    );
+                    assert_eq!(
+                        parquet_field.def_level, 0,
+                        "Virtual columns can only be added at def level 0"
+                    );
+
+                    fields_vec.push(virtual_column.clone());
+                    let virtual_parquet_field = complex::convert_virtual_field(
+                        virtual_column,
+                        parquet_field.rep_level,
+                        parquet_field.def_level,
+                    )?;
+                    children.push(virtual_parquet_field);
+                }
+
+                // Update the fields
+                parquet_field.arrow_type = 
DataType::Struct(Fields::from(fields_vec));
+            }
+            _ => unreachable!("Root field must be a group"),
+        }
+    }
+
+    match &parquet_field.arrow_type {
+        DataType::Struct(fields) => Ok(FieldLevels {
+            fields: fields.clone(),
+            levels: Some(parquet_field),
         }),
+        _ => unreachable!(),
     }
 }
 
@@ -2249,4 +2357,32 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn test_parquet_to_arrow_field_levels_with_virtual_rejects_non_virtual() {
+        let message_type = "
+        message test_schema {
+            REQUIRED INT32 id;
+        }
+        ";
+        let parquet_schema = 
Arc::new(parse_message_type(message_type).unwrap());
+        let descriptor = SchemaDescriptor::new(parquet_schema);
+
+        // Try to pass a regular field (not a virtual column)
+        let regular_field = Arc::new(Field::new("regular_column", 
DataType::Int64, false));
+        let result = parquet_to_arrow_field_levels_with_virtual(
+            &descriptor,
+            ProjectionMask::all(),
+            None,
+            &[regular_field],
+        );
+
+        assert!(result.is_err());
+        assert!(
+            result
+                .unwrap_err()
+                .to_string()
+                .contains("is not a virtual column")
+        );
+    }
 }
diff --git a/parquet/src/arrow/schema/virtual_type.rs 
b/parquet/src/arrow/schema/virtual_type.rs
new file mode 100644
index 0000000000..eca2aef08d
--- /dev/null
+++ b/parquet/src/arrow/schema/virtual_type.rs
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! RowNumber
+//!
+
+use arrow_schema::{ArrowError, DataType, Field, extension::ExtensionType};
+
+/// Prefix for virtual column extension type names.
+macro_rules! VIRTUAL_PREFIX {
+    () => {
+        "parquet.virtual."
+    };
+}
+
+/// The extension type for row numbers.
+///
+/// Extension name: `parquet.virtual.row_number`.
+///
+/// This virtual column has storage type `Int64` and uses empty string 
metadata.
+#[derive(Debug, Default, Clone, Copy, PartialEq)]
+pub struct RowNumber;
+
+impl ExtensionType for RowNumber {
+    const NAME: &'static str = concat!(VIRTUAL_PREFIX!(), "row_number");
+    type Metadata = &'static str;
+
+    fn metadata(&self) -> &Self::Metadata {
+        &""
+    }
+
+    fn serialize_metadata(&self) -> Option<String> {
+        Some(String::default())
+    }
+
+    fn deserialize_metadata(metadata: Option<&str>) -> Result<Self::Metadata, 
ArrowError> {
+        if metadata.is_some_and(str::is_empty) {
+            Ok("")
+        } else {
+            Err(ArrowError::InvalidArgumentError(
+                "Virtual column extension type expects an empty string as 
metadata".to_owned(),
+            ))
+        }
+    }
+
+    fn supports_data_type(&self, data_type: &DataType) -> Result<(), 
ArrowError> {
+        match data_type {
+            DataType::Int64 => Ok(()),
+            data_type => Err(ArrowError::InvalidArgumentError(format!(
+                "Virtual column data type mismatch, expected Int64, found 
{data_type}"
+            ))),
+        }
+    }
+
+    fn try_new(data_type: &DataType, _metadata: Self::Metadata) -> 
Result<Self, ArrowError> {
+        Self.supports_data_type(data_type).map(|_| Self)
+    }
+}
+
+/// Returns `true` if the field is a virtual column.
+///
+/// Virtual columns have extension type names starting with `parquet.virtual.`.
+pub fn is_virtual_column(field: &Field) -> bool {
+    field
+        .extension_type_name()
+        .map(|name| name.starts_with(VIRTUAL_PREFIX!()))
+        .unwrap_or(false)
+}
+
+#[cfg(test)]
+mod tests {
+    use arrow_schema::{
+        ArrowError, DataType, Field,
+        extension::{EXTENSION_TYPE_METADATA_KEY, EXTENSION_TYPE_NAME_KEY},
+    };
+
+    use super::*;
+
+    #[test]
+    fn valid() -> Result<(), ArrowError> {
+        let mut field = Field::new("", DataType::Int64, false);
+        field.try_with_extension_type(RowNumber)?;
+        field.try_extension_type::<RowNumber>()?;
+
+        Ok(())
+    }
+
+    #[test]
+    #[should_panic(expected = "Field extension type name missing")]
+    fn missing_name() {
+        let field = Field::new("", DataType::Int64, false).with_metadata(
+            [(EXTENSION_TYPE_METADATA_KEY.to_owned(), "".to_owned())]
+                .into_iter()
+                .collect(),
+        );
+        field.extension_type::<RowNumber>();
+    }
+
+    #[test]
+    #[should_panic(expected = "expected Int64, found Int32")]
+    fn invalid_type() {
+        Field::new("", DataType::Int32, false).with_extension_type(RowNumber);
+    }
+
+    #[test]
+    #[should_panic(expected = "Virtual column extension type expects an empty 
string as metadata")]
+    fn missing_metadata() {
+        let field = Field::new("", DataType::Int64, false).with_metadata(
+            [(
+                EXTENSION_TYPE_NAME_KEY.to_owned(),
+                RowNumber::NAME.to_owned(),
+            )]
+            .into_iter()
+            .collect(),
+        );
+        field.extension_type::<RowNumber>();
+    }
+
+    #[test]
+    #[should_panic(expected = "Virtual column extension type expects an empty 
string as metadata")]
+    fn invalid_metadata() {
+        let field = Field::new("", DataType::Int64, false).with_metadata(
+            [
+                (
+                    EXTENSION_TYPE_NAME_KEY.to_owned(),
+                    RowNumber::NAME.to_owned(),
+                ),
+                (
+                    EXTENSION_TYPE_METADATA_KEY.to_owned(),
+                    "non-empty".to_owned(),
+                ),
+            ]
+            .into_iter()
+            .collect(),
+        );
+        field.extension_type::<RowNumber>();
+    }
+}
diff --git a/parquet/src/file/metadata/thrift/mod.rs 
b/parquet/src/file/metadata/thrift/mod.rs
index 6fdb0e0c4b..225c4d29d2 100644
--- a/parquet/src/file/metadata/thrift/mod.rs
+++ b/parquet/src/file/metadata/thrift/mod.rs
@@ -765,8 +765,17 @@ pub(crate) fn parquet_metadata_from_bytes(
                 let schema_descr = schema_descr.as_ref().unwrap();
                 let list_ident = prot.read_list_begin()?;
                 let mut rg_vec = Vec::with_capacity(list_ident.size as usize);
-                for _ in 0..list_ident.size {
-                    rg_vec.push(read_row_group(&mut prot, schema_descr)?);
+
+                // Read row groups and handle ordinal assignment
+                let mut assigner = OrdinalAssigner::new();
+                for ordinal in 0..list_ident.size {
+                    let ordinal: i16 = ordinal.try_into().map_err(|_| {
+                        ParquetError::General(format!(
+                            "Row group ordinal {ordinal} exceeds i16 max 
value",
+                        ))
+                    })?;
+                    let rg = read_row_group(&mut prot, schema_descr)?;
+                    rg_vec.push(assigner.ensure(ordinal, rg)?);
                 }
                 row_groups = Some(rg_vec);
             }
@@ -858,6 +867,58 @@ pub(crate) fn parquet_metadata_from_bytes(
     Ok(ParquetMetaData::new(fmd, row_groups))
 }
 
+/// Assign [`RowGroupMetaData::ordinal`]  if it is missing.
+#[derive(Debug, Default)]
+pub(crate) struct OrdinalAssigner {
+    first_has_ordinal: Option<bool>,
+}
+
+impl OrdinalAssigner {
+    fn new() -> Self {
+        Default::default()
+    }
+
+    /// Sets [`RowGroupMetaData::ordinal`] if it is missing.
+    ///
+    /// # Arguments
+    /// - actual_ordinal: The ordinal (index) of the row group being processed
+    ///   in the file metadata.
+    /// - rg: The [`RowGroupMetaData`] to potentially modify.
+    ///
+    /// Ensures:
+    /// 1. If the first row group has an ordinal, all subsequent row groups 
must
+    ///    also have ordinals.
+    /// 2. If the first row group does NOT have an ordinal, all subsequent row
+    ///    groups must also not have ordinals.
+    fn ensure(
+        &mut self,
+        actual_ordinal: i16,
+        mut rg: RowGroupMetaData,
+    ) -> Result<RowGroupMetaData> {
+        let rg_has_ordinal = rg.ordinal.is_some();
+
+        // Only set first_has_ordinal if it's None (first row group that 
arrives)
+        if self.first_has_ordinal.is_none() {
+            self.first_has_ordinal = Some(rg_has_ordinal);
+        }
+
+        // assign ordinal if missing and consistent with first row group
+        let first_has_ordinal = self.first_has_ordinal.unwrap();
+        if !first_has_ordinal && !rg_has_ordinal {
+            rg.ordinal = Some(actual_ordinal);
+        } else if first_has_ordinal != rg_has_ordinal {
+            return Err(general_err!(
+                "Inconsistent ordinal assignment: first_has_ordinal is set to \
+                {} but row-group with actual ordinal {} has rg_has_ordinal set 
to {}",
+                first_has_ordinal,
+                actual_ordinal,
+                rg_has_ordinal
+            ));
+        }
+        Ok(rg)
+    }
+}
+
 thrift_struct!(
     pub(crate) struct IndexPageHeader {}
 );

Reply via email to