alamb commented on code in PR #8122: URL: https://github.com/apache/arrow-rs/pull/8122#discussion_r2277771760
########## parquet-variant-compute/src/variant_get/mod.rs: ########## @@ -15,50 +15,208 @@ // specific language governing permissions and limitations // under the License. use arrow::{ - array::{Array, ArrayRef}, + array::{self, Array, ArrayRef, BinaryViewArray, StructArray}, compute::CastOptions, error::Result, }; -use arrow_schema::{ArrowError, FieldRef}; -use parquet_variant::VariantPath; +use arrow_schema::{ArrowError, DataType, FieldRef}; +use parquet_variant::{VariantPath, VariantPathElement}; use crate::variant_array::ShreddingState; -use crate::variant_get::output::instantiate_output_builder; -use crate::VariantArray; +use crate::{variant_array::ShreddedVariantFieldArray, VariantArray}; + +use std::sync::Arc; mod output; +pub(crate) enum ShreddedPathStep<'a> { + /// Path step succeeded, return the new shredding state + Success(&'a ShreddingState), + /// The path element is not present in the `typed_value` column and there is no `value` column, + /// so we we know it does not exist. It, and all paths under it, are all-NULL. + Missing, + /// The path element is not present in the `typed_value` and must be retrieved from the `value` + /// column instead. The caller should be prepared to handle any value, including the requested + /// type, an arbitrary "wrong" type, or `Variant::Null`. + NotShredded, +} + +/// Given a shredded variant field -- a `(value?, typed_value?)` pair -- try to take one path step +/// deeper. For a `VariantPathElement::Field`, the step fails if there is no `typed_value` at this +/// level, or if `typed_value` is not a struct, or if the requested field name does not exist. +/// +/// TODO: Support `VariantPathElement::Index`? It wouldn't be easy, and maybe not even possible. +pub(crate) fn follow_shredded_path_element<'a>( + shredding_state: &'a ShreddingState, + path_element: &VariantPathElement<'_>, +) -> Result<ShreddedPathStep<'a>> { + // If the requested path element 's not present in `typed_value`, and `value` is missing, then + // we know it does not exist; it, and all paths under it, are all-NULL. + let missing_path_step = || { + if shredding_state.value_field().is_none() { + ShreddedPathStep::Missing + } else { + ShreddedPathStep::NotShredded + } + }; + + let Some(typed_value) = shredding_state.typed_value_field() else { + return Ok(missing_path_step()); + }; + + match path_element { + VariantPathElement::Field { name } => { + // Try to step into the requested field name of a struct. + let Some(field) = typed_value + .as_any() + .downcast_ref::<StructArray>() + .and_then(|typed_value| typed_value.column_by_name(name)) + else { + return Ok(missing_path_step()); + }; + + let field = field + .as_any() + .downcast_ref::<ShreddedVariantFieldArray>() + .ok_or_else(|| { + // TODO: Should we blow up? Or just end the traversal and let the normal + // variant pathing code sort out the mess that it must anyway be + // prepared to handle? + ArrowError::InvalidArgumentError(format!( + "Expected a ShreddedVariantFieldArray, got {:?} instead", + field.data_type(), + )) + })?; + + Ok(ShreddedPathStep::Success(field.shredding_state())) + } + VariantPathElement::Index { .. } => { + // TODO: Support array indexing. Among other things, it will require slicing not + // only the array we have here, but also the corresponding metadata and null masks. + Err(ArrowError::NotYetImplemented( + "Pathing into shredded variant array index".into(), + )) + } + } +} + +/// Follows the given path as far as possible through shredded variant fields. If the path ends on a +/// shredded field, return it directly. Otherwise, use a row shredder to follow the rest of the path +/// and extract the requested value on a per-row basis. +fn shredded_get_path( + input: &VariantArray, + path: &[VariantPathElement<'_>], + as_type: Option<&DataType>, +) -> Result<ArrayRef> { + // Helper that creates a new VariantArray from the given nested value and typed_value columns, + let make_target_variant = |value: Option<BinaryViewArray>, typed_value: Option<ArrayRef>| { + let metadata = input.metadata_field().clone(); + let nulls = input.inner().nulls().cloned(); + VariantArray::from_parts( + metadata, + value, + typed_value, + nulls, + ) + }; + + // Helper that shreds a VariantArray to a specific type. + let shred_basic_variant = |target: VariantArray, path: VariantPath<'_>, as_type: Option<&DataType>| { + let mut builder = output::struct_output::make_shredding_row_builder(path, as_type)?; + for i in 0..target.len() { + if target.is_null(i) { + builder.append_null()?; + } else { + builder.append_value(&target.value(i))?; + } + } + builder.finish() + }; + + // Peel away the prefix of path elements that traverses the shredded parts of this variant + // column. Shredding will traverse the rest of the path on a per-row basis. + let mut shredding_state = input.shredding_state(); + let mut path_index = 0; + for path_element in path { + match follow_shredded_path_element(shredding_state, path_element)? { + ShreddedPathStep::Success(state) => { + shredding_state = state; + path_index += 1; + continue; + } + ShreddedPathStep::Missing => { + let num_rows = input.len(); + let arr = match as_type { + Some(data_type) => Arc::new(array::new_null_array(data_type, num_rows)) as _, + None => Arc::new(array::NullArray::new(num_rows)) as _, + }; + return Ok(arr); + } + ShreddedPathStep::NotShredded => { + let target = make_target_variant(shredding_state.value_field().cloned(), None); + return shred_basic_variant(target, path[path_index..].into(), as_type); + } + }; + } + + // Path exhausted! Create a new `VariantArray` for the location we landed on. + let target = make_target_variant( + shredding_state.value_field().cloned(), + shredding_state.typed_value_field().cloned(), + ); + + // If our caller did not request any specific type, we can just return whatever we landed on. + let Some(data_type) = as_type else { + return Ok(Arc::new(target)); + }; + + // Structs are special. Recurse into each field separately, hoping to follow the shredding even + // further, and build up the final struct from those individually shredded results. + if let DataType::Struct(fields) = data_type { + let children = fields + .iter() + .map(|field| { + shredded_get_path( + &target, + &[VariantPathElement::from(field.name().as_str())], + Some(field.data_type()), + ) + }) + .collect::<Result<Vec<_>>>()?; + + return Ok(Arc::new(StructArray::try_new( + fields.clone(), + children, + target.nulls().cloned(), + )?)); + } + + // Not a struct, so directly shred the variant as the requested type + shred_basic_variant(target, VariantPath::default(), as_type) +} + /// Returns an array with the specified path extracted from the variant values. /// /// The return array type depends on the `as_type` field of the options parameter /// 1. `as_type: None`: a VariantArray is returned. The values in this new VariantArray will point /// to the specified path. /// 2. `as_type: Some(<specific field>)`: an array of the specified type is returned. +/// +/// TODO: How would a caller request a struct or list type where the fields/elements can be any +/// variant? Caller can pass None as the requested type to fetch a specific path, but it would +/// quickly become annoying (and inefficient) to call `variant_get` for each leaf value in a struct or +/// list and then try to assemble the results. pub fn variant_get(input: &ArrayRef, options: GetOptions) -> Result<ArrayRef> { Review Comment: Thank you 🙏 -- my plan is to try and turn these into tickets tomorrow morning -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org