This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new d81d6c37ae [parquet] Add row group index virtual column (#9117)
d81d6c37ae is described below
commit d81d6c37ae9ef0f4971657371c0ebf1cf46a67bc
Author: Matthew Kim <[email protected]>
AuthorDate: Thu Jan 15 02:03:06 2026 -1000
[parquet] Add row group index virtual column (#9117)
# Which issue does this PR close?
- Closes https://github.com/apache/arrow-rs/issues/8641
- Related https://github.com/apache/arrow-rs/issues/8799
# Rationale for this change
This PR adds support for storing row group indices as a virtual column,
allowing users to determine which row group each row originated from
The usage pattern is quite simple, something like:
```rs
use parquet::arrow::RowGroupIndex;
let row_group_index_field = Arc::new(
Field::new("row_group_index", DataType::Int64, false)
.with_extension_type(RowGroupIndex)
);
let options = ArrowReaderOptions::new()
.with_virtual_columns(vec![row_group_index_field])?;
let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file,
options)?
.build()?;
```
---
parquet/src/arrow/array_reader/builder.rs | 16 ++
parquet/src/arrow/array_reader/mod.rs | 1 +
parquet/src/arrow/array_reader/row_group_index.rs | 241 ++++++++++++++++++++++
parquet/src/arrow/arrow_reader/mod.rs | 184 ++++++++++++++++-
parquet/src/arrow/schema/complex.rs | 5 +-
parquet/src/arrow/schema/virtual_type.rs | 114 +++++++++-
6 files changed, 553 insertions(+), 8 deletions(-)
diff --git a/parquet/src/arrow/array_reader/builder.rs
b/parquet/src/arrow/array_reader/builder.rs
index 82c8e77f63..c3d0e2daa3 100644
--- a/parquet/src/arrow/array_reader/builder.rs
+++ b/parquet/src/arrow/array_reader/builder.rs
@@ -26,6 +26,7 @@ use
crate::arrow::array_reader::cached_array_reader::CachedArrayReader;
use crate::arrow::array_reader::empty_array::make_empty_array_reader;
use
crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader;
use crate::arrow::array_reader::row_group_cache::RowGroupCache;
+use crate::arrow::array_reader::row_group_index::RowGroupIndexReader;
use crate::arrow::array_reader::row_number::RowNumberReader;
use crate::arrow::array_reader::{
ArrayReader, FixedSizeListArrayReader, ListArrayReader, MapArrayReader,
NullArrayReader,
@@ -169,6 +170,9 @@ impl<'a> ArrayReaderBuilder<'a> {
// They need to be built by specialized readers
match virtual_type {
VirtualColumnType::RowNumber =>
Ok(Some(self.build_row_number_reader()?)),
+ VirtualColumnType::RowGroupIndex => {
+ Ok(Some(self.build_row_group_index_reader()?))
+ }
}
}
ParquetFieldType::Group { .. } => match &field.arrow_type {
@@ -194,6 +198,18 @@ impl<'a> ArrayReaderBuilder<'a> {
)?))
}
+ fn build_row_group_index_reader(&self) -> Result<Box<dyn ArrayReader>> {
+ let parquet_metadata = self.parquet_metadata.ok_or_else(|| {
+ ParquetError::General(
+ "ParquetMetaData is required to read virtual row group index
columns.".to_string(),
+ )
+ })?;
+ Ok(Box::new(RowGroupIndexReader::try_new(
+ parquet_metadata,
+ self.row_groups.row_groups(),
+ )?))
+ }
+
/// Build array reader for map type.
fn build_map_reader(
&self,
diff --git a/parquet/src/arrow/array_reader/mod.rs
b/parquet/src/arrow/array_reader/mod.rs
index 54be89f230..019a871e19 100644
--- a/parquet/src/arrow/array_reader/mod.rs
+++ b/parquet/src/arrow/array_reader/mod.rs
@@ -43,6 +43,7 @@ mod map_array;
mod null_array;
mod primitive_array;
mod row_group_cache;
+mod row_group_index;
mod row_number;
mod struct_array;
diff --git a/parquet/src/arrow/array_reader/row_group_index.rs
b/parquet/src/arrow/array_reader/row_group_index.rs
new file mode 100644
index 0000000000..c0b976fc00
--- /dev/null
+++ b/parquet/src/arrow/array_reader/row_group_index.rs
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::array_reader::ArrayReader;
+use crate::errors::{ParquetError, Result};
+use crate::file::metadata::{ParquetMetaData, RowGroupMetaData};
+use arrow_array::{ArrayRef, Int64Array};
+use arrow_schema::DataType;
+use std::any::Any;
+use std::collections::HashMap;
+use std::sync::Arc;
+
+pub(crate) struct RowGroupIndexReader {
+ buffered_indices: Vec<i64>,
+ remaining_indices:
std::iter::Flatten<std::vec::IntoIter<std::iter::RepeatN<i64>>>,
+}
+
+impl RowGroupIndexReader {
+ pub(crate) fn try_new<'a>(
+ parquet_metadata: &'a ParquetMetaData,
+ row_groups: impl Iterator<Item = &'a RowGroupMetaData>,
+ ) -> Result<Self> {
+ // build mapping from ordinal to row group index
+ // this is O(n) where n is the total number of row groups in the file
+ let ordinal_to_index: HashMap<i16, i64> =
+
HashMap::from_iter(parquet_metadata.row_groups().iter().enumerate().filter_map(
+ |(row_group_index, rg)| {
+ rg.ordinal()
+ .map(|ordinal| (ordinal, row_group_index as i64))
+ },
+ ));
+
+ // build repeating iterators in the order specified by the row_groups
iterator
+ // this is O(m) where m is the number of selected row groups
+ let repeated_indices: Vec<_> = row_groups
+ .map(|rg| {
+ let ordinal = rg.ordinal().ok_or_else(|| {
+ ParquetError::General(
+ "Row group missing ordinal field, required to compute
row group indices"
+ .to_string(),
+ )
+ })?;
+
+ let row_group_index =
ordinal_to_index.get(&ordinal).ok_or_else(|| {
+ ParquetError::General(format!(
+ "Row group with ordinal {} not found in metadata",
+ ordinal
+ ))
+ })?;
+
+ // repeat row group index for each row in this row group
+ Ok(std::iter::repeat_n(
+ *row_group_index,
+ rg.num_rows() as usize,
+ ))
+ })
+ .collect::<Result<_>>()?;
+
+ Ok(Self {
+ buffered_indices: Vec::new(),
+ remaining_indices: repeated_indices.into_iter().flatten(),
+ })
+ }
+}
+
+impl ArrayReader for RowGroupIndexReader {
+ fn read_records(&mut self, batch_size: usize) -> Result<usize> {
+ let starting_len = self.buffered_indices.len();
+ self.buffered_indices
+ .extend((self.remaining_indices.by_ref()).take(batch_size));
+ Ok(self.buffered_indices.len() - starting_len)
+ }
+
+ fn skip_records(&mut self, num_records: usize) -> Result<usize> {
+ // TODO: Use advance_by when it stabilizes to improve performance
+ Ok((self.remaining_indices.by_ref()).take(num_records).count())
+ }
+
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn get_data_type(&self) -> &DataType {
+ &DataType::Int64
+ }
+
+ fn consume_batch(&mut self) -> Result<ArrayRef> {
+ Ok(Arc::new(Int64Array::from_iter(
+ self.buffered_indices.drain(..),
+ )))
+ }
+
+ fn get_def_levels(&self) -> Option<&[i16]> {
+ None
+ }
+
+ fn get_rep_levels(&self) -> Option<&[i16]> {
+ None
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::basic::Type as PhysicalType;
+ use crate::file::metadata::{
+ ColumnChunkMetaData, FileMetaData, ParquetMetaData, RowGroupMetaData,
+ };
+ use crate::schema::types::{SchemaDescriptor, Type as SchemaType};
+ use std::sync::Arc;
+
+ fn create_test_schema() -> Arc<SchemaDescriptor> {
+ let schema = SchemaType::group_type_builder("schema")
+ .with_fields(vec![Arc::new(
+ SchemaType::primitive_type_builder("test_col",
PhysicalType::INT32)
+ .build()
+ .unwrap(),
+ )])
+ .build()
+ .unwrap();
+ Arc::new(SchemaDescriptor::new(Arc::new(schema)))
+ }
+
+ fn create_test_parquet_metadata(row_groups: Vec<(i16, i64)>) ->
ParquetMetaData {
+ let schema_descr = create_test_schema();
+
+ let mut row_group_metas = vec![];
+ for (ordinal, num_rows) in row_groups {
+ let columns: Vec<_> = schema_descr
+ .columns()
+ .iter()
+ .map(|col|
ColumnChunkMetaData::builder(col.clone()).build().unwrap())
+ .collect();
+
+ let row_group = RowGroupMetaData::builder(schema_descr.clone())
+ .set_num_rows(num_rows)
+ .set_ordinal(ordinal)
+ .set_total_byte_size(100)
+ .set_column_metadata(columns)
+ .build()
+ .unwrap();
+ row_group_metas.push(row_group);
+ }
+
+ let total_rows: i64 = row_group_metas.iter().map(|rg|
rg.num_rows()).sum();
+ let file_metadata = FileMetaData::new(1, total_rows, None, None,
schema_descr, None);
+
+ ParquetMetaData::new(file_metadata, row_group_metas)
+ }
+
+ #[test]
+ fn test_row_group_index_reader_basic() {
+ // create metadata with 3 row groups, each with varying number of rows
+ let metadata = create_test_parquet_metadata(vec![
+ (0, 2), // rg: 0, ordinal: 0, 2 rows
+ (1, 3), // rg: 1, ordinal: 1, 3 rows
+ (2, 1), // rg: 2, ordinal: 2, 1 row
+ ]);
+
+ let selected_row_groups: Vec<_> =
metadata.row_groups().iter().collect();
+
+ let mut reader =
+ RowGroupIndexReader::try_new(&metadata,
selected_row_groups.into_iter()).unwrap();
+
+ // 2 rows + 3 rows + 1 row
+ let num_read = reader.read_records(6).unwrap();
+ assert_eq!(num_read, 6);
+
+ let array = reader.consume_batch().unwrap();
+ let indices = array.as_any().downcast_ref::<Int64Array>().unwrap();
+
+ let actual: Vec<i64> = indices.iter().map(|v| v.unwrap()).collect();
+ assert_eq!(actual, [0, 0, 1, 1, 1, 2],);
+ }
+
+ #[test]
+ fn test_row_group_index_reader_reverse_order() {
+ // create metadata with 3 row groups, each rg has 2 rows
+ let metadata = create_test_parquet_metadata(vec![(0, 2), (1, 2), (2,
2)]);
+
+ // select only rgs with ordinals 2 and 0 (in that order)
+ // means select row group 2 first, then row group 0, skipping row
group 1
+ let selected_row_groups: Vec<_> =
+ vec![&metadata.row_groups()[2], &metadata.row_groups()[0]];
+
+ let mut reader =
+ RowGroupIndexReader::try_new(&metadata,
selected_row_groups.into_iter()).unwrap();
+
+ let num_read = reader.read_records(6).unwrap();
+ // 2 rgs * 2 rows each
+ assert_eq!(num_read, 4);
+
+ let array = reader.consume_batch().unwrap();
+ let indices = array.as_any().downcast_ref::<Int64Array>().unwrap();
+
+ let actual: Vec<i64> = indices.iter().map(|v| v.unwrap()).collect();
+
+ assert_eq!(actual, [2, 2, 0, 0],);
+ }
+
+ #[test]
+ fn test_row_group_index_reader_skip_records() {
+ // rg 0: 3 rows, rg 1: 4 rows, rg 2: 2 rows
+ // [0, 0, 0, 1, 1, 1, 1, 2, 2]
+ let metadata = create_test_parquet_metadata(vec![(0, 3), (1, 4), (2,
2)]);
+
+ let selected_row_groups =
metadata.row_groups().iter().collect::<Vec<_>>();
+
+ let mut reader =
+ RowGroupIndexReader::try_new(&metadata,
selected_row_groups.into_iter()).unwrap();
+
+ // skip first 5 rows
+ // [0, 0, 0, 1, 1, 1, 1, 2, 2]
+ // |---- skip ---|
+ let num_skipped = reader.skip_records(5).unwrap();
+ assert_eq!(num_skipped, 5);
+
+ let num_read = reader.read_records(10).unwrap();
+ assert_eq!(num_read, 4);
+
+ let array = reader.consume_batch().unwrap();
+ let indices = array.as_any().downcast_ref::<Int64Array>().unwrap();
+
+ let actual = indices.iter().map(|v| v.unwrap()).collect::<Vec<i64>>();
+ assert_eq!(actual, [1, 1, 2, 2]);
+ }
+}
diff --git a/parquet/src/arrow/arrow_reader/mod.rs
b/parquet/src/arrow/arrow_reader/mod.rs
index 5641b4e554..cb172e1e38 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -700,7 +700,7 @@ impl ArrowReaderOptions {
/// Include virtual columns in the output.
///
- /// Virtual columns are columns that are not part of the Parquet schema,
but are added to the output by the reader such as row numbers.
+ /// Virtual columns are columns that are not part of the Parquet schema,
but are added to the output by the reader such as row numbers and row group
indices.
///
/// # Example
/// ```
@@ -1539,7 +1539,10 @@ pub(crate) mod tests {
ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder, RowFilter,
RowSelection,
RowSelectionPolicy, RowSelector,
};
- use crate::arrow::schema::{add_encoded_arrow_schema_to_metadata,
virtual_type::RowNumber};
+ use crate::arrow::schema::{
+ add_encoded_arrow_schema_to_metadata,
+ virtual_type::{RowGroupIndex, RowNumber},
+ };
use crate::arrow::{ArrowWriter, ProjectionMask};
use crate::basic::{ConvertedType, Encoding, LogicalType, Repetition, Type
as PhysicalType};
use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
@@ -5940,6 +5943,183 @@ pub(crate) mod tests {
);
}
+ #[test]
+ fn test_read_row_group_indices() {
+ // create a parquet file with 3 row groups, 2 rows each
+ let array1 = Int64Array::from(vec![1, 2]);
+ let array2 = Int64Array::from(vec![3, 4]);
+ let array3 = Int64Array::from(vec![5, 6]);
+
+ let batch1 =
+ RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as
ArrayRef)]).unwrap();
+ let batch2 =
+ RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as
ArrayRef)]).unwrap();
+ let batch3 =
+ RecordBatch::try_from_iter(vec![("value", Arc::new(array3) as
ArrayRef)]).unwrap();
+
+ let mut buffer = Vec::new();
+ let options = WriterProperties::builder()
+ .set_max_row_group_size(2)
+ .build();
+ let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(),
Some(options)).unwrap();
+ writer.write(&batch1).unwrap();
+ writer.write(&batch2).unwrap();
+ writer.write(&batch3).unwrap();
+ writer.close().unwrap();
+
+ let file = Bytes::from(buffer);
+ let row_group_index_field = Arc::new(
+ Field::new("row_group_index", ArrowDataType::Int64, false)
+ .with_extension_type(RowGroupIndex),
+ );
+
+ let options = ArrowReaderOptions::new()
+ .with_virtual_columns(vec![row_group_index_field.clone()])
+ .unwrap();
+ let mut arrow_reader =
+
ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(), options)
+ .expect("reader builder with virtual columns")
+ .build()
+ .expect("reader with virtual columns");
+
+ let batch = arrow_reader.next().unwrap().unwrap();
+
+ assert_eq!(batch.num_columns(), 2);
+ assert_eq!(batch.num_rows(), 6);
+
+ assert_eq!(
+ batch
+ .column(0)
+ .as_primitive::<types::Int64Type>()
+ .iter()
+ .collect::<Vec<_>>(),
+ vec![Some(1), Some(2), Some(3), Some(4), Some(5), Some(6)]
+ );
+
+ assert_eq!(
+ batch
+ .column(1)
+ .as_primitive::<types::Int64Type>()
+ .iter()
+ .collect::<Vec<_>>(),
+ vec![Some(0), Some(0), Some(1), Some(1), Some(2), Some(2)]
+ );
+ }
+
+ #[test]
+ fn test_read_only_row_group_indices() {
+ let array1 = Int64Array::from(vec![1, 2, 3]);
+ let array2 = Int64Array::from(vec![4, 5]);
+
+ let batch1 =
+ RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as
ArrayRef)]).unwrap();
+ let batch2 =
+ RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as
ArrayRef)]).unwrap();
+
+ let mut buffer = Vec::new();
+ let options = WriterProperties::builder()
+ .set_max_row_group_size(3)
+ .build();
+ let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(),
Some(options)).unwrap();
+ writer.write(&batch1).unwrap();
+ writer.write(&batch2).unwrap();
+ writer.close().unwrap();
+
+ let file = Bytes::from(buffer);
+ let row_group_index_field = Arc::new(
+ Field::new("row_group_index", ArrowDataType::Int64, false)
+ .with_extension_type(RowGroupIndex),
+ );
+
+ let options = ArrowReaderOptions::new()
+ .with_virtual_columns(vec![row_group_index_field.clone()])
+ .unwrap();
+ let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
+ let num_columns = metadata
+ .metadata
+ .file_metadata()
+ .schema_descr()
+ .num_columns();
+
+ let mut arrow_reader =
ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
+ .with_projection(ProjectionMask::none(num_columns))
+ .build()
+ .expect("reader with virtual columns only");
+
+ let batch = arrow_reader.next().unwrap().unwrap();
+ let schema =
Arc::new(Schema::new(vec![(*row_group_index_field).clone()]));
+
+ assert_eq!(batch.schema(), schema);
+ assert_eq!(batch.num_columns(), 1);
+ assert_eq!(batch.num_rows(), 5);
+
+ assert_eq!(
+ batch
+ .column(0)
+ .as_primitive::<types::Int64Type>()
+ .iter()
+ .collect::<Vec<_>>(),
+ vec![Some(0), Some(0), Some(0), Some(1), Some(1)]
+ );
+ }
+
+ #[test]
+ fn test_read_row_group_indices_with_selection() -> Result<()> {
+ let mut buffer = Vec::new();
+ let options = WriterProperties::builder()
+ .set_max_row_group_size(10)
+ .build();
+
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "value",
+ ArrowDataType::Int64,
+ false,
+ )]));
+
+ let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(),
Some(options))?;
+
+ // write out 3 batches of 10 rows each
+ for i in 0..3 {
+ let start = i * 10;
+ let array = Int64Array::from_iter_values(start..start + 10);
+ let batch = RecordBatch::try_from_iter(vec![("value",
Arc::new(array) as ArrayRef)])?;
+ writer.write(&batch)?;
+ }
+ writer.close()?;
+
+ let file = Bytes::from(buffer);
+ let row_group_index_field = Arc::new(
+ Field::new("rg_idx", ArrowDataType::Int64,
false).with_extension_type(RowGroupIndex),
+ );
+
+ let options =
+
ArrowReaderOptions::new().with_virtual_columns(vec![row_group_index_field])?;
+
+ // test row groups are read in reverse order
+ let arrow_reader =
+
ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(),
options.clone())?
+ .with_row_groups(vec![2, 1, 0])
+ .build()?;
+
+ let batches: Vec<_> = arrow_reader.collect::<Result<Vec<_>, _>>()?;
+ let combined = concat_batches(&batches[0].schema(), &batches)?;
+
+ let values = combined.column(0).as_primitive::<types::Int64Type>();
+ let first_val = values.value(0);
+ let last_val = values.value(combined.num_rows() - 1);
+ // first row from rg 2
+ assert_eq!(first_val, 20);
+ // the last row from rg 0
+ assert_eq!(last_val, 9);
+
+ let rg_indices = combined.column(1).as_primitive::<types::Int64Type>();
+ assert_eq!(rg_indices.value(0), 2);
+ assert_eq!(rg_indices.value(10), 1);
+ assert_eq!(rg_indices.value(20), 0);
+
+ Ok(())
+ }
+
pub(crate) fn test_row_numbers_with_multiple_row_groups_helper<F>(
use_filter: bool,
test_case: F,
diff --git a/parquet/src/arrow/schema/complex.rs
b/parquet/src/arrow/schema/complex.rs
index 8b85cac479..fdb3943e85 100644
--- a/parquet/src/arrow/schema/complex.rs
+++ b/parquet/src/arrow/schema/complex.rs
@@ -20,7 +20,7 @@ use std::sync::Arc;
use crate::arrow::schema::extension::try_add_extension_type;
use crate::arrow::schema::primitive::convert_primitive;
-use crate::arrow::schema::virtual_type::RowNumber;
+use crate::arrow::schema::virtual_type::{RowGroupIndex, RowNumber};
use crate::arrow::{PARQUET_FIELD_ID_META_KEY, ProjectionMask};
use crate::basic::{ConvertedType, Repetition};
use crate::errors::ParquetError;
@@ -88,6 +88,8 @@ impl ParquetField {
pub enum VirtualColumnType {
/// Row number within the file
RowNumber,
+ /// Row group index
+ RowGroupIndex,
}
#[derive(Debug, Clone)]
@@ -584,6 +586,7 @@ pub(super) fn convert_virtual_field(
let virtual_type = match extension_name {
RowNumber::NAME => VirtualColumnType::RowNumber,
+ RowGroupIndex::NAME => VirtualColumnType::RowGroupIndex,
_ => {
return Err(ParquetError::ArrowError(format!(
"unsupported virtual column type '{}' for field '{}'",
diff --git a/parquet/src/arrow/schema/virtual_type.rs
b/parquet/src/arrow/schema/virtual_type.rs
index d3092a3bd5..b71753f61c 100644
--- a/parquet/src/arrow/schema/virtual_type.rs
+++ b/parquet/src/arrow/schema/virtual_type.rs
@@ -27,6 +27,50 @@ macro_rules! VIRTUAL_PREFIX {
};
}
+/// The extension type for row group indices
+///
+/// Extension name: `parquet.virtual.row_group_index`
+///
+/// This virtual column has storage type `Int64` and uses empty string metadata
+#[derive(Debug, Default, Clone, Copy, PartialEq)]
+pub struct RowGroupIndex;
+
+impl ExtensionType for RowGroupIndex {
+ const NAME: &'static str = concat!(VIRTUAL_PREFIX!(), "row_group_index");
+ type Metadata = &'static str;
+
+ fn metadata(&self) -> &Self::Metadata {
+ &""
+ }
+
+ fn serialize_metadata(&self) -> Option<String> {
+ Some(String::default())
+ }
+
+ fn deserialize_metadata(metadata: Option<&str>) -> Result<Self::Metadata,
ArrowError> {
+ if metadata.is_some_and(str::is_empty) {
+ Ok("")
+ } else {
+ Err(ArrowError::InvalidArgumentError(
+ "Virtual column extension type expects an empty string as
metadata".to_owned(),
+ ))
+ }
+ }
+
+ fn supports_data_type(&self, data_type: &DataType) -> Result<(),
ArrowError> {
+ match data_type {
+ DataType::Int64 => Ok(()),
+ data_type => Err(ArrowError::InvalidArgumentError(format!(
+ "Virtual column data type mismatch, expected Int64, found
{data_type}"
+ ))),
+ }
+ }
+
+ fn try_new(data_type: &DataType, _metadata: Self::Metadata) ->
Result<Self, ArrowError> {
+ Self.supports_data_type(data_type).map(|_| Self)
+ }
+}
+
/// The extension type for row numbers.
///
/// Extension name: `parquet.virtual.row_number`.
@@ -90,7 +134,7 @@ mod tests {
use super::*;
#[test]
- fn valid() -> Result<(), ArrowError> {
+ fn row_number_valid() -> Result<(), ArrowError> {
let mut field = Field::new("", DataType::Int64, false);
field.try_with_extension_type(RowNumber)?;
field.try_extension_type::<RowNumber>()?;
@@ -100,7 +144,7 @@ mod tests {
#[test]
#[should_panic(expected = "Field extension type name missing")]
- fn missing_name() {
+ fn row_number_missing_name() {
let field = Field::new("", DataType::Int64, false).with_metadata(
[(EXTENSION_TYPE_METADATA_KEY.to_owned(), "".to_owned())]
.into_iter()
@@ -111,13 +155,13 @@ mod tests {
#[test]
#[should_panic(expected = "expected Int64, found Int32")]
- fn invalid_type() {
+ fn row_number_invalid_type() {
Field::new("", DataType::Int32, false).with_extension_type(RowNumber);
}
#[test]
#[should_panic(expected = "Virtual column extension type expects an empty
string as metadata")]
- fn missing_metadata() {
+ fn row_number_missing_metadata() {
let field = Field::new("", DataType::Int64, false).with_metadata(
[(
EXTENSION_TYPE_NAME_KEY.to_owned(),
@@ -131,7 +175,7 @@ mod tests {
#[test]
#[should_panic(expected = "Virtual column extension type expects an empty
string as metadata")]
- fn invalid_metadata() {
+ fn row_number_invalid_metadata() {
let field = Field::new("", DataType::Int64, false).with_metadata(
[
(
@@ -148,4 +192,64 @@ mod tests {
);
field.extension_type::<RowNumber>();
}
+
+ #[test]
+ fn row_group_index_valid() -> Result<(), ArrowError> {
+ let mut field = Field::new("", DataType::Int64, false);
+ field.try_with_extension_type(RowGroupIndex)?;
+ field.try_extension_type::<RowGroupIndex>()?;
+
+ Ok(())
+ }
+
+ #[test]
+ #[should_panic(expected = "Field extension type name missing")]
+ fn row_group_index_missing_name() {
+ let field = Field::new("", DataType::Int64, false).with_metadata(
+ [(EXTENSION_TYPE_METADATA_KEY.to_owned(), "".to_owned())]
+ .into_iter()
+ .collect(),
+ );
+ field.extension_type::<RowGroupIndex>();
+ }
+
+ #[test]
+ #[should_panic(expected = "expected Int64, found Int32")]
+ fn row_group_index_invalid_type() {
+ Field::new("", DataType::Int32,
false).with_extension_type(RowGroupIndex);
+ }
+
+ #[test]
+ #[should_panic(expected = "Virtual column extension type expects an empty
string as metadata")]
+ fn row_group_index_missing_metadata() {
+ let field = Field::new("", DataType::Int64, false).with_metadata(
+ [(
+ EXTENSION_TYPE_NAME_KEY.to_owned(),
+ RowGroupIndex::NAME.to_owned(),
+ )]
+ .into_iter()
+ .collect(),
+ );
+ field.extension_type::<RowGroupIndex>();
+ }
+
+ #[test]
+ #[should_panic(expected = "Virtual column extension type expects an empty
string as metadata")]
+ fn row_group_index_invalid_metadata() {
+ let field = Field::new("", DataType::Int64, false).with_metadata(
+ [
+ (
+ EXTENSION_TYPE_NAME_KEY.to_owned(),
+ RowGroupIndex::NAME.to_owned(),
+ ),
+ (
+ EXTENSION_TYPE_METADATA_KEY.to_owned(),
+ "non-empty".to_owned(),
+ ),
+ ]
+ .into_iter()
+ .collect(),
+ );
+ field.extension_type::<RowGroupIndex>();
+ }
}