This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new d81d6c37ae [parquet] Add row group index virtual column (#9117)
d81d6c37ae is described below

commit d81d6c37ae9ef0f4971657371c0ebf1cf46a67bc
Author: Matthew Kim <[email protected]>
AuthorDate: Thu Jan 15 02:03:06 2026 -1000

    [parquet] Add row group index virtual column (#9117)
    
    # Which issue does this PR close?
    
    - Closes https://github.com/apache/arrow-rs/issues/8641
    - Related https://github.com/apache/arrow-rs/issues/8799
    
    # Rationale for this change
    
    This PR adds support for storing row group indices as a virtual column,
    allowing users to determine which row group each row originated from
    
    The usage pattern is quite simple, something like:
    
    ```rs
    use parquet::arrow::RowGroupIndex;
    
    let row_group_index_field = Arc::new(
        Field::new("row_group_index", DataType::Int64, false)
            .with_extension_type(RowGroupIndex)
    );
    
    let options = ArrowReaderOptions::new()
        .with_virtual_columns(vec![row_group_index_field])?;
    
    let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, 
options)?
        .build()?;
    ```
---
 parquet/src/arrow/array_reader/builder.rs         |  16 ++
 parquet/src/arrow/array_reader/mod.rs             |   1 +
 parquet/src/arrow/array_reader/row_group_index.rs | 241 ++++++++++++++++++++++
 parquet/src/arrow/arrow_reader/mod.rs             | 184 ++++++++++++++++-
 parquet/src/arrow/schema/complex.rs               |   5 +-
 parquet/src/arrow/schema/virtual_type.rs          | 114 +++++++++-
 6 files changed, 553 insertions(+), 8 deletions(-)

diff --git a/parquet/src/arrow/array_reader/builder.rs 
b/parquet/src/arrow/array_reader/builder.rs
index 82c8e77f63..c3d0e2daa3 100644
--- a/parquet/src/arrow/array_reader/builder.rs
+++ b/parquet/src/arrow/array_reader/builder.rs
@@ -26,6 +26,7 @@ use 
crate::arrow::array_reader::cached_array_reader::CachedArrayReader;
 use crate::arrow::array_reader::empty_array::make_empty_array_reader;
 use 
crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader;
 use crate::arrow::array_reader::row_group_cache::RowGroupCache;
+use crate::arrow::array_reader::row_group_index::RowGroupIndexReader;
 use crate::arrow::array_reader::row_number::RowNumberReader;
 use crate::arrow::array_reader::{
     ArrayReader, FixedSizeListArrayReader, ListArrayReader, MapArrayReader, 
NullArrayReader,
@@ -169,6 +170,9 @@ impl<'a> ArrayReaderBuilder<'a> {
                 // They need to be built by specialized readers
                 match virtual_type {
                     VirtualColumnType::RowNumber => 
Ok(Some(self.build_row_number_reader()?)),
+                    VirtualColumnType::RowGroupIndex => {
+                        Ok(Some(self.build_row_group_index_reader()?))
+                    }
                 }
             }
             ParquetFieldType::Group { .. } => match &field.arrow_type {
@@ -194,6 +198,18 @@ impl<'a> ArrayReaderBuilder<'a> {
         )?))
     }
 
+    fn build_row_group_index_reader(&self) -> Result<Box<dyn ArrayReader>> {
+        let parquet_metadata = self.parquet_metadata.ok_or_else(|| {
+            ParquetError::General(
+                "ParquetMetaData is required to read virtual row group index 
columns.".to_string(),
+            )
+        })?;
+        Ok(Box::new(RowGroupIndexReader::try_new(
+            parquet_metadata,
+            self.row_groups.row_groups(),
+        )?))
+    }
+
     /// Build array reader for map type.
     fn build_map_reader(
         &self,
diff --git a/parquet/src/arrow/array_reader/mod.rs 
b/parquet/src/arrow/array_reader/mod.rs
index 54be89f230..019a871e19 100644
--- a/parquet/src/arrow/array_reader/mod.rs
+++ b/parquet/src/arrow/array_reader/mod.rs
@@ -43,6 +43,7 @@ mod map_array;
 mod null_array;
 mod primitive_array;
 mod row_group_cache;
+mod row_group_index;
 mod row_number;
 mod struct_array;
 
diff --git a/parquet/src/arrow/array_reader/row_group_index.rs 
b/parquet/src/arrow/array_reader/row_group_index.rs
new file mode 100644
index 0000000000..c0b976fc00
--- /dev/null
+++ b/parquet/src/arrow/array_reader/row_group_index.rs
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::array_reader::ArrayReader;
+use crate::errors::{ParquetError, Result};
+use crate::file::metadata::{ParquetMetaData, RowGroupMetaData};
+use arrow_array::{ArrayRef, Int64Array};
+use arrow_schema::DataType;
+use std::any::Any;
+use std::collections::HashMap;
+use std::sync::Arc;
+
+pub(crate) struct RowGroupIndexReader {
+    buffered_indices: Vec<i64>,
+    remaining_indices: 
std::iter::Flatten<std::vec::IntoIter<std::iter::RepeatN<i64>>>,
+}
+
+impl RowGroupIndexReader {
+    pub(crate) fn try_new<'a>(
+        parquet_metadata: &'a ParquetMetaData,
+        row_groups: impl Iterator<Item = &'a RowGroupMetaData>,
+    ) -> Result<Self> {
+        // build mapping from ordinal to row group index
+        // this is O(n) where n is the total number of row groups in the file
+        let ordinal_to_index: HashMap<i16, i64> =
+            
HashMap::from_iter(parquet_metadata.row_groups().iter().enumerate().filter_map(
+                |(row_group_index, rg)| {
+                    rg.ordinal()
+                        .map(|ordinal| (ordinal, row_group_index as i64))
+                },
+            ));
+
+        // build repeating iterators in the order specified by the row_groups 
iterator
+        // this is O(m) where m is the number of selected row groups
+        let repeated_indices: Vec<_> = row_groups
+            .map(|rg| {
+                let ordinal = rg.ordinal().ok_or_else(|| {
+                    ParquetError::General(
+                        "Row group missing ordinal field, required to compute 
row group indices"
+                            .to_string(),
+                    )
+                })?;
+
+                let row_group_index = 
ordinal_to_index.get(&ordinal).ok_or_else(|| {
+                    ParquetError::General(format!(
+                        "Row group with ordinal {} not found in metadata",
+                        ordinal
+                    ))
+                })?;
+
+                // repeat row group index for each row in this row group
+                Ok(std::iter::repeat_n(
+                    *row_group_index,
+                    rg.num_rows() as usize,
+                ))
+            })
+            .collect::<Result<_>>()?;
+
+        Ok(Self {
+            buffered_indices: Vec::new(),
+            remaining_indices: repeated_indices.into_iter().flatten(),
+        })
+    }
+}
+
+impl ArrayReader for RowGroupIndexReader {
+    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
+        let starting_len = self.buffered_indices.len();
+        self.buffered_indices
+            .extend((self.remaining_indices.by_ref()).take(batch_size));
+        Ok(self.buffered_indices.len() - starting_len)
+    }
+
+    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+        // TODO: Use advance_by when it stabilizes to improve performance
+        Ok((self.remaining_indices.by_ref()).take(num_records).count())
+    }
+
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn get_data_type(&self) -> &DataType {
+        &DataType::Int64
+    }
+
+    fn consume_batch(&mut self) -> Result<ArrayRef> {
+        Ok(Arc::new(Int64Array::from_iter(
+            self.buffered_indices.drain(..),
+        )))
+    }
+
+    fn get_def_levels(&self) -> Option<&[i16]> {
+        None
+    }
+
+    fn get_rep_levels(&self) -> Option<&[i16]> {
+        None
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::basic::Type as PhysicalType;
+    use crate::file::metadata::{
+        ColumnChunkMetaData, FileMetaData, ParquetMetaData, RowGroupMetaData,
+    };
+    use crate::schema::types::{SchemaDescriptor, Type as SchemaType};
+    use std::sync::Arc;
+
+    fn create_test_schema() -> Arc<SchemaDescriptor> {
+        let schema = SchemaType::group_type_builder("schema")
+            .with_fields(vec![Arc::new(
+                SchemaType::primitive_type_builder("test_col", 
PhysicalType::INT32)
+                    .build()
+                    .unwrap(),
+            )])
+            .build()
+            .unwrap();
+        Arc::new(SchemaDescriptor::new(Arc::new(schema)))
+    }
+
+    fn create_test_parquet_metadata(row_groups: Vec<(i16, i64)>) -> 
ParquetMetaData {
+        let schema_descr = create_test_schema();
+
+        let mut row_group_metas = vec![];
+        for (ordinal, num_rows) in row_groups {
+            let columns: Vec<_> = schema_descr
+                .columns()
+                .iter()
+                .map(|col| 
ColumnChunkMetaData::builder(col.clone()).build().unwrap())
+                .collect();
+
+            let row_group = RowGroupMetaData::builder(schema_descr.clone())
+                .set_num_rows(num_rows)
+                .set_ordinal(ordinal)
+                .set_total_byte_size(100)
+                .set_column_metadata(columns)
+                .build()
+                .unwrap();
+            row_group_metas.push(row_group);
+        }
+
+        let total_rows: i64 = row_group_metas.iter().map(|rg| 
rg.num_rows()).sum();
+        let file_metadata = FileMetaData::new(1, total_rows, None, None, 
schema_descr, None);
+
+        ParquetMetaData::new(file_metadata, row_group_metas)
+    }
+
+    #[test]
+    fn test_row_group_index_reader_basic() {
+        // create metadata with 3 row groups, each with varying number of rows
+        let metadata = create_test_parquet_metadata(vec![
+            (0, 2), // rg: 0, ordinal: 0, 2 rows
+            (1, 3), // rg: 1, ordinal: 1, 3 rows
+            (2, 1), // rg: 2, ordinal: 2, 1 row
+        ]);
+
+        let selected_row_groups: Vec<_> = 
metadata.row_groups().iter().collect();
+
+        let mut reader =
+            RowGroupIndexReader::try_new(&metadata, 
selected_row_groups.into_iter()).unwrap();
+
+        // 2 rows + 3 rows + 1 row
+        let num_read = reader.read_records(6).unwrap();
+        assert_eq!(num_read, 6);
+
+        let array = reader.consume_batch().unwrap();
+        let indices = array.as_any().downcast_ref::<Int64Array>().unwrap();
+
+        let actual: Vec<i64> = indices.iter().map(|v| v.unwrap()).collect();
+        assert_eq!(actual, [0, 0, 1, 1, 1, 2],);
+    }
+
+    #[test]
+    fn test_row_group_index_reader_reverse_order() {
+        // create metadata with 3 row groups, each rg has 2 rows
+        let metadata = create_test_parquet_metadata(vec![(0, 2), (1, 2), (2, 
2)]);
+
+        // select only rgs with ordinals 2 and 0 (in that order)
+        // means select row group 2 first, then row group 0, skipping row 
group 1
+        let selected_row_groups: Vec<_> =
+            vec![&metadata.row_groups()[2], &metadata.row_groups()[0]];
+
+        let mut reader =
+            RowGroupIndexReader::try_new(&metadata, 
selected_row_groups.into_iter()).unwrap();
+
+        let num_read = reader.read_records(6).unwrap();
+        // 2 rgs * 2 rows each
+        assert_eq!(num_read, 4);
+
+        let array = reader.consume_batch().unwrap();
+        let indices = array.as_any().downcast_ref::<Int64Array>().unwrap();
+
+        let actual: Vec<i64> = indices.iter().map(|v| v.unwrap()).collect();
+
+        assert_eq!(actual, [2, 2, 0, 0],);
+    }
+
+    #[test]
+    fn test_row_group_index_reader_skip_records() {
+        // rg 0: 3 rows, rg 1: 4 rows, rg 2: 2 rows
+        // [0, 0, 0, 1, 1, 1, 1, 2, 2]
+        let metadata = create_test_parquet_metadata(vec![(0, 3), (1, 4), (2, 
2)]);
+
+        let selected_row_groups = 
metadata.row_groups().iter().collect::<Vec<_>>();
+
+        let mut reader =
+            RowGroupIndexReader::try_new(&metadata, 
selected_row_groups.into_iter()).unwrap();
+
+        // skip first 5 rows
+        // [0, 0, 0, 1, 1, 1, 1, 2, 2]
+        // |---- skip ---|
+        let num_skipped = reader.skip_records(5).unwrap();
+        assert_eq!(num_skipped, 5);
+
+        let num_read = reader.read_records(10).unwrap();
+        assert_eq!(num_read, 4);
+
+        let array = reader.consume_batch().unwrap();
+        let indices = array.as_any().downcast_ref::<Int64Array>().unwrap();
+
+        let actual = indices.iter().map(|v| v.unwrap()).collect::<Vec<i64>>();
+        assert_eq!(actual, [1, 1, 2, 2]);
+    }
+}
diff --git a/parquet/src/arrow/arrow_reader/mod.rs 
b/parquet/src/arrow/arrow_reader/mod.rs
index 5641b4e554..cb172e1e38 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -700,7 +700,7 @@ impl ArrowReaderOptions {
 
     /// Include virtual columns in the output.
     ///
-    /// Virtual columns are columns that are not part of the Parquet schema, 
but are added to the output by the reader such as row numbers.
+    /// Virtual columns are columns that are not part of the Parquet schema, 
but are added to the output by the reader such as row numbers and row group 
indices.
     ///
     /// # Example
     /// ```
@@ -1539,7 +1539,10 @@ pub(crate) mod tests {
         ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder, RowFilter, 
RowSelection,
         RowSelectionPolicy, RowSelector,
     };
-    use crate::arrow::schema::{add_encoded_arrow_schema_to_metadata, 
virtual_type::RowNumber};
+    use crate::arrow::schema::{
+        add_encoded_arrow_schema_to_metadata,
+        virtual_type::{RowGroupIndex, RowNumber},
+    };
     use crate::arrow::{ArrowWriter, ProjectionMask};
     use crate::basic::{ConvertedType, Encoding, LogicalType, Repetition, Type 
as PhysicalType};
     use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
@@ -5940,6 +5943,183 @@ pub(crate) mod tests {
         );
     }
 
+    #[test]
+    fn test_read_row_group_indices() {
+        // create a parquet file with 3 row groups, 2 rows each
+        let array1 = Int64Array::from(vec![1, 2]);
+        let array2 = Int64Array::from(vec![3, 4]);
+        let array3 = Int64Array::from(vec![5, 6]);
+
+        let batch1 =
+            RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as 
ArrayRef)]).unwrap();
+        let batch2 =
+            RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as 
ArrayRef)]).unwrap();
+        let batch3 =
+            RecordBatch::try_from_iter(vec![("value", Arc::new(array3) as 
ArrayRef)]).unwrap();
+
+        let mut buffer = Vec::new();
+        let options = WriterProperties::builder()
+            .set_max_row_group_size(2)
+            .build();
+        let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), 
Some(options)).unwrap();
+        writer.write(&batch1).unwrap();
+        writer.write(&batch2).unwrap();
+        writer.write(&batch3).unwrap();
+        writer.close().unwrap();
+
+        let file = Bytes::from(buffer);
+        let row_group_index_field = Arc::new(
+            Field::new("row_group_index", ArrowDataType::Int64, false)
+                .with_extension_type(RowGroupIndex),
+        );
+
+        let options = ArrowReaderOptions::new()
+            .with_virtual_columns(vec![row_group_index_field.clone()])
+            .unwrap();
+        let mut arrow_reader =
+            
ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(), options)
+                .expect("reader builder with virtual columns")
+                .build()
+                .expect("reader with virtual columns");
+
+        let batch = arrow_reader.next().unwrap().unwrap();
+
+        assert_eq!(batch.num_columns(), 2);
+        assert_eq!(batch.num_rows(), 6);
+
+        assert_eq!(
+            batch
+                .column(0)
+                .as_primitive::<types::Int64Type>()
+                .iter()
+                .collect::<Vec<_>>(),
+            vec![Some(1), Some(2), Some(3), Some(4), Some(5), Some(6)]
+        );
+
+        assert_eq!(
+            batch
+                .column(1)
+                .as_primitive::<types::Int64Type>()
+                .iter()
+                .collect::<Vec<_>>(),
+            vec![Some(0), Some(0), Some(1), Some(1), Some(2), Some(2)]
+        );
+    }
+
+    #[test]
+    fn test_read_only_row_group_indices() {
+        let array1 = Int64Array::from(vec![1, 2, 3]);
+        let array2 = Int64Array::from(vec![4, 5]);
+
+        let batch1 =
+            RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as 
ArrayRef)]).unwrap();
+        let batch2 =
+            RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as 
ArrayRef)]).unwrap();
+
+        let mut buffer = Vec::new();
+        let options = WriterProperties::builder()
+            .set_max_row_group_size(3)
+            .build();
+        let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), 
Some(options)).unwrap();
+        writer.write(&batch1).unwrap();
+        writer.write(&batch2).unwrap();
+        writer.close().unwrap();
+
+        let file = Bytes::from(buffer);
+        let row_group_index_field = Arc::new(
+            Field::new("row_group_index", ArrowDataType::Int64, false)
+                .with_extension_type(RowGroupIndex),
+        );
+
+        let options = ArrowReaderOptions::new()
+            .with_virtual_columns(vec![row_group_index_field.clone()])
+            .unwrap();
+        let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
+        let num_columns = metadata
+            .metadata
+            .file_metadata()
+            .schema_descr()
+            .num_columns();
+
+        let mut arrow_reader = 
ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
+            .with_projection(ProjectionMask::none(num_columns))
+            .build()
+            .expect("reader with virtual columns only");
+
+        let batch = arrow_reader.next().unwrap().unwrap();
+        let schema = 
Arc::new(Schema::new(vec![(*row_group_index_field).clone()]));
+
+        assert_eq!(batch.schema(), schema);
+        assert_eq!(batch.num_columns(), 1);
+        assert_eq!(batch.num_rows(), 5);
+
+        assert_eq!(
+            batch
+                .column(0)
+                .as_primitive::<types::Int64Type>()
+                .iter()
+                .collect::<Vec<_>>(),
+            vec![Some(0), Some(0), Some(0), Some(1), Some(1)]
+        );
+    }
+
+    #[test]
+    fn test_read_row_group_indices_with_selection() -> Result<()> {
+        let mut buffer = Vec::new();
+        let options = WriterProperties::builder()
+            .set_max_row_group_size(10)
+            .build();
+
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "value",
+            ArrowDataType::Int64,
+            false,
+        )]));
+
+        let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), 
Some(options))?;
+
+        // write out 3 batches of 10 rows each
+        for i in 0..3 {
+            let start = i * 10;
+            let array = Int64Array::from_iter_values(start..start + 10);
+            let batch = RecordBatch::try_from_iter(vec![("value", 
Arc::new(array) as ArrayRef)])?;
+            writer.write(&batch)?;
+        }
+        writer.close()?;
+
+        let file = Bytes::from(buffer);
+        let row_group_index_field = Arc::new(
+            Field::new("rg_idx", ArrowDataType::Int64, 
false).with_extension_type(RowGroupIndex),
+        );
+
+        let options =
+            
ArrowReaderOptions::new().with_virtual_columns(vec![row_group_index_field])?;
+
+        // test row groups are read in reverse order
+        let arrow_reader =
+            
ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(), 
options.clone())?
+                .with_row_groups(vec![2, 1, 0])
+                .build()?;
+
+        let batches: Vec<_> = arrow_reader.collect::<Result<Vec<_>, _>>()?;
+        let combined = concat_batches(&batches[0].schema(), &batches)?;
+
+        let values = combined.column(0).as_primitive::<types::Int64Type>();
+        let first_val = values.value(0);
+        let last_val = values.value(combined.num_rows() - 1);
+        // first row from rg 2
+        assert_eq!(first_val, 20);
+        // the last row from rg 0
+        assert_eq!(last_val, 9);
+
+        let rg_indices = combined.column(1).as_primitive::<types::Int64Type>();
+        assert_eq!(rg_indices.value(0), 2);
+        assert_eq!(rg_indices.value(10), 1);
+        assert_eq!(rg_indices.value(20), 0);
+
+        Ok(())
+    }
+
     pub(crate) fn test_row_numbers_with_multiple_row_groups_helper<F>(
         use_filter: bool,
         test_case: F,
diff --git a/parquet/src/arrow/schema/complex.rs 
b/parquet/src/arrow/schema/complex.rs
index 8b85cac479..fdb3943e85 100644
--- a/parquet/src/arrow/schema/complex.rs
+++ b/parquet/src/arrow/schema/complex.rs
@@ -20,7 +20,7 @@ use std::sync::Arc;
 
 use crate::arrow::schema::extension::try_add_extension_type;
 use crate::arrow::schema::primitive::convert_primitive;
-use crate::arrow::schema::virtual_type::RowNumber;
+use crate::arrow::schema::virtual_type::{RowGroupIndex, RowNumber};
 use crate::arrow::{PARQUET_FIELD_ID_META_KEY, ProjectionMask};
 use crate::basic::{ConvertedType, Repetition};
 use crate::errors::ParquetError;
@@ -88,6 +88,8 @@ impl ParquetField {
 pub enum VirtualColumnType {
     /// Row number within the file
     RowNumber,
+    /// Row group index
+    RowGroupIndex,
 }
 
 #[derive(Debug, Clone)]
@@ -584,6 +586,7 @@ pub(super) fn convert_virtual_field(
 
     let virtual_type = match extension_name {
         RowNumber::NAME => VirtualColumnType::RowNumber,
+        RowGroupIndex::NAME => VirtualColumnType::RowGroupIndex,
         _ => {
             return Err(ParquetError::ArrowError(format!(
                 "unsupported virtual column type '{}' for field '{}'",
diff --git a/parquet/src/arrow/schema/virtual_type.rs 
b/parquet/src/arrow/schema/virtual_type.rs
index d3092a3bd5..b71753f61c 100644
--- a/parquet/src/arrow/schema/virtual_type.rs
+++ b/parquet/src/arrow/schema/virtual_type.rs
@@ -27,6 +27,50 @@ macro_rules! VIRTUAL_PREFIX {
     };
 }
 
+/// The extension type for row group indices
+///
+/// Extension name: `parquet.virtual.row_group_index`
+///
+/// This virtual column has storage type `Int64` and uses empty string metadata
+#[derive(Debug, Default, Clone, Copy, PartialEq)]
+pub struct RowGroupIndex;
+
+impl ExtensionType for RowGroupIndex {
+    const NAME: &'static str = concat!(VIRTUAL_PREFIX!(), "row_group_index");
+    type Metadata = &'static str;
+
+    fn metadata(&self) -> &Self::Metadata {
+        &""
+    }
+
+    fn serialize_metadata(&self) -> Option<String> {
+        Some(String::default())
+    }
+
+    fn deserialize_metadata(metadata: Option<&str>) -> Result<Self::Metadata, 
ArrowError> {
+        if metadata.is_some_and(str::is_empty) {
+            Ok("")
+        } else {
+            Err(ArrowError::InvalidArgumentError(
+                "Virtual column extension type expects an empty string as 
metadata".to_owned(),
+            ))
+        }
+    }
+
+    fn supports_data_type(&self, data_type: &DataType) -> Result<(), 
ArrowError> {
+        match data_type {
+            DataType::Int64 => Ok(()),
+            data_type => Err(ArrowError::InvalidArgumentError(format!(
+                "Virtual column data type mismatch, expected Int64, found 
{data_type}"
+            ))),
+        }
+    }
+
+    fn try_new(data_type: &DataType, _metadata: Self::Metadata) -> 
Result<Self, ArrowError> {
+        Self.supports_data_type(data_type).map(|_| Self)
+    }
+}
+
 /// The extension type for row numbers.
 ///
 /// Extension name: `parquet.virtual.row_number`.
@@ -90,7 +134,7 @@ mod tests {
     use super::*;
 
     #[test]
-    fn valid() -> Result<(), ArrowError> {
+    fn row_number_valid() -> Result<(), ArrowError> {
         let mut field = Field::new("", DataType::Int64, false);
         field.try_with_extension_type(RowNumber)?;
         field.try_extension_type::<RowNumber>()?;
@@ -100,7 +144,7 @@ mod tests {
 
     #[test]
     #[should_panic(expected = "Field extension type name missing")]
-    fn missing_name() {
+    fn row_number_missing_name() {
         let field = Field::new("", DataType::Int64, false).with_metadata(
             [(EXTENSION_TYPE_METADATA_KEY.to_owned(), "".to_owned())]
                 .into_iter()
@@ -111,13 +155,13 @@ mod tests {
 
     #[test]
     #[should_panic(expected = "expected Int64, found Int32")]
-    fn invalid_type() {
+    fn row_number_invalid_type() {
         Field::new("", DataType::Int32, false).with_extension_type(RowNumber);
     }
 
     #[test]
     #[should_panic(expected = "Virtual column extension type expects an empty 
string as metadata")]
-    fn missing_metadata() {
+    fn row_number_missing_metadata() {
         let field = Field::new("", DataType::Int64, false).with_metadata(
             [(
                 EXTENSION_TYPE_NAME_KEY.to_owned(),
@@ -131,7 +175,7 @@ mod tests {
 
     #[test]
     #[should_panic(expected = "Virtual column extension type expects an empty 
string as metadata")]
-    fn invalid_metadata() {
+    fn row_number_invalid_metadata() {
         let field = Field::new("", DataType::Int64, false).with_metadata(
             [
                 (
@@ -148,4 +192,64 @@ mod tests {
         );
         field.extension_type::<RowNumber>();
     }
+
+    #[test]
+    fn row_group_index_valid() -> Result<(), ArrowError> {
+        let mut field = Field::new("", DataType::Int64, false);
+        field.try_with_extension_type(RowGroupIndex)?;
+        field.try_extension_type::<RowGroupIndex>()?;
+
+        Ok(())
+    }
+
+    #[test]
+    #[should_panic(expected = "Field extension type name missing")]
+    fn row_group_index_missing_name() {
+        let field = Field::new("", DataType::Int64, false).with_metadata(
+            [(EXTENSION_TYPE_METADATA_KEY.to_owned(), "".to_owned())]
+                .into_iter()
+                .collect(),
+        );
+        field.extension_type::<RowGroupIndex>();
+    }
+
+    #[test]
+    #[should_panic(expected = "expected Int64, found Int32")]
+    fn row_group_index_invalid_type() {
+        Field::new("", DataType::Int32, 
false).with_extension_type(RowGroupIndex);
+    }
+
+    #[test]
+    #[should_panic(expected = "Virtual column extension type expects an empty 
string as metadata")]
+    fn row_group_index_missing_metadata() {
+        let field = Field::new("", DataType::Int64, false).with_metadata(
+            [(
+                EXTENSION_TYPE_NAME_KEY.to_owned(),
+                RowGroupIndex::NAME.to_owned(),
+            )]
+            .into_iter()
+            .collect(),
+        );
+        field.extension_type::<RowGroupIndex>();
+    }
+
+    #[test]
+    #[should_panic(expected = "Virtual column extension type expects an empty 
string as metadata")]
+    fn row_group_index_invalid_metadata() {
+        let field = Field::new("", DataType::Int64, false).with_metadata(
+            [
+                (
+                    EXTENSION_TYPE_NAME_KEY.to_owned(),
+                    RowGroupIndex::NAME.to_owned(),
+                ),
+                (
+                    EXTENSION_TYPE_METADATA_KEY.to_owned(),
+                    "non-empty".to_owned(),
+                ),
+            ]
+            .into_iter()
+            .collect(),
+        );
+        field.extension_type::<RowGroupIndex>();
+    }
 }

Reply via email to