scovich commented on code in PR #8715: URL: https://github.com/apache/arrow-rs/pull/8715#discussion_r2535385871
########## parquet/src/arrow/array_reader/row_number.rs: ########## @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::array_reader::ArrayReader; +use crate::errors::{ParquetError, Result}; +use crate::file::metadata::{ParquetMetaData, RowGroupMetaData}; +use arrow_array::{ArrayRef, Int64Array}; +use arrow_schema::DataType; +use std::any::Any; +use std::collections::HashSet; +use std::sync::Arc; + +pub(crate) struct RowNumberReader { + buffered_row_numbers: Vec<i64>, + remaining_row_numbers: std::iter::Flatten<std::vec::IntoIter<std::ops::Range<i64>>>, +} + +impl RowNumberReader { + pub(crate) fn try_new<'a>( + parquet_metadata: &'a ParquetMetaData, + row_groups: impl Iterator<Item = &'a RowGroupMetaData>, + ) -> Result<Self> { + // Collect ordinals from the selected row groups + let selected_ordinals: HashSet<i16> = row_groups + .map(|rg| { + rg.ordinal().ok_or_else(|| { + ParquetError::General( + "Row group missing ordinal field, required to compute row numbers" + .to_string(), + ) + }) + }) + .collect::<Result<_>>()?; + + // Iterate through all row groups once, computing first_row_index and creating ranges + // This is O(M) where M is total row groups, much better than O(N * O) where N is selected + let mut first_row_index: i64 = 0; + let mut ranges = Vec::new(); + + for rg in parquet_metadata.row_groups() { + if let Some(ordinal) = rg.ordinal() { + if selected_ordinals.contains(&ordinal) { + ranges.push((ordinal, first_row_index..first_row_index + rg.num_rows())); + } + } + first_row_index += rg.num_rows(); + } + + // Sort ranges by ordinal to maintain original row group order + ranges.sort_by_key(|(ordinal, _)| *ordinal); Review Comment: I don't understand this part? The row groups were supplied in some particular order (by the `row_groups` iterator), and we're reordering by row group ordinal instead? Wouldn't that cause row number mismatches with other columns that continue reading in the original order? It seems like we actually need: ```rust let selected_ordinals = HashMap<i16, usize> = row_groups .enumerate() .map(...) .collect::<Result<_>>()?; ``` and then ranges needs to use that enumeration ordinal (not the row group ordinal): ```rust if let Some(i) = selected_ordinals.get(&ordinal) { ranges.push((i, ...); } ``` ... so that the sorted ranges match the original `row_group` iterator's order? ########## parquet/src/arrow/schema/virtual_type.rs: ########## @@ -0,0 +1,152 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! RowNumber +//! + +use arrow_schema::{ArrowError, DataType, Field, extension::ExtensionType}; + +/// Prefix for virtual column extension type names. +macro_rules! VIRTUAL_PREFIX { + () => { + "parquet.virtual." + }; +} + +/// The extension type for row numbers. +/// +/// Extension name: `parquet.virtual.row_number`. +/// +/// This virtual column has storage type `Int64` and uses empty string metadata. +#[derive(Debug, Default, Clone, Copy, PartialEq)] +pub struct RowNumber; + +impl ExtensionType for RowNumber { + const NAME: &'static str = concat!(VIRTUAL_PREFIX!(), "row_number"); + type Metadata = &'static str; + + fn metadata(&self) -> &Self::Metadata { + &"" + } + + fn serialize_metadata(&self) -> Option<String> { + Some(String::default()) + } + + fn deserialize_metadata(metadata: Option<&str>) -> Result<Self::Metadata, ArrowError> { + if metadata.is_some_and(str::is_empty) { + Ok("") + } else { + Err(ArrowError::InvalidArgumentError( + "Virtual column extension type expects an empty string as metadata".to_owned(), + )) + } + } + + fn supports_data_type(&self, data_type: &DataType) -> Result<(), ArrowError> { + match data_type { + DataType::Int64 => Ok(()), + data_type => Err(ArrowError::InvalidArgumentError(format!( + "Virtual column data type mismatch, expected Int64, found {data_type}" + ))), + } + } + + fn try_new(data_type: &DataType, _metadata: Self::Metadata) -> Result<Self, ArrowError> { + Self.supports_data_type(data_type).map(|_| Self) + } +} + +/// Returns `true` if the field is a virtual column. +/// +/// Virtual columns have extension type names starting with `parquet.virtual.`. +pub fn is_virtual_column(field: &Field) -> bool { + field + .extension_type_name() + .map(|name| name.starts_with(VIRTUAL_PREFIX!())) + .unwrap_or(false) +} Review Comment: ```suggestion .map_or(false, |name| name.starts_with(VIRTUAL_PREFIX!())) ``` ########## parquet/src/arrow/arrow_reader/mod.rs: ########## @@ -566,6 +571,73 @@ impl ArrowReaderOptions { } } + /// Include virtual columns in the output. + /// + /// Virtual columns are columns that are not part of the Parquet schema, but are added to the output by the reader such as row numbers. + /// + /// # Example + /// ``` + /// # use std::sync::Arc; + /// # use arrow_array::{ArrayRef, Int64Array, RecordBatch}; + /// # use arrow_schema::{DataType, Field, Schema}; + /// # use parquet::arrow::{ArrowWriter, RowNumber}; + /// # use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder}; + /// # use tempfile::tempfile; + /// # + /// # fn main() -> Result<(), Box<dyn std::error::Error>> { + /// // Create a simple record batch with some data + /// let values = Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef; + /// let batch = RecordBatch::try_from_iter(vec![("value", values)])?; + /// + /// // Write the batch to a temporary parquet file + /// let file = tempfile()?; + /// let mut writer = ArrowWriter::try_new( + /// file.try_clone()?, + /// batch.schema(), + /// None + /// )?; + /// writer.write(&batch)?; + /// writer.close()?; + /// + /// // Create a virtual column for row numbers + /// let row_number_field = Arc::new(Field::new("row_number", DataType::Int64, false) + /// .with_extension_type(RowNumber)); + /// + /// // Configure options with virtual columns + /// let options = ArrowReaderOptions::new() + /// .with_virtual_columns(vec![row_number_field]); + /// + /// // Create a reader with the options + /// let mut reader = ParquetRecordBatchReaderBuilder::try_new_with_options( + /// file, + /// options + /// )? + /// .build()?; + /// + /// // Read the batch - it will include both the original column and the virtual row_number column + /// let result_batch = reader.next().unwrap()?; + /// assert_eq!(result_batch.num_columns(), 2); // "value" + "row_number" + /// assert_eq!(result_batch.num_rows(), 3); + /// # + /// # Ok(()) + /// # } + /// ``` + pub fn with_virtual_columns(self, virtual_columns: Vec<FieldRef>) -> Self { + // Validate that all fields are virtual columns + for field in &virtual_columns { + if !is_virtual_column(field) { + panic!( Review Comment: not a fan of panics, but if we're going to panic why not just ```rust assert!( is_virtual_column(field), "...", field.name() ); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
