scovich commented on code in PR #8122: URL: https://github.com/apache/arrow-rs/pull/8122#discussion_r2270791236
########## parquet-variant-compute/src/variant_array.rs: ########## @@ -48,6 +48,9 @@ pub struct VariantArray { /// Reference to the underlying StructArray inner: StructArray, + /// The metadata column of this variant + metadata: BinaryViewArray, Review Comment: In this pathfinding, `ShreddingState` -- already used for referring to both top-level variant columns (fields: metadata, value, typed_value) -- also proved useful for referring to nested shredded variant columns (fields: value, typed_value). So I hoisted out the `metadata` column to unlock that new use case. ########## parquet-variant-compute/src/variant_array.rs: ########## @@ -102,31 +105,42 @@ impl VariantArray { ))); }; - // Find the value field, if present - let value = inner - .column_by_name("value") - .map(|v| { - v.as_binary_view_opt().ok_or_else(|| { - ArrowError::NotYetImplemented(format!( - "VariantArray 'value' field must be BinaryView, got {}", - v.data_type() - )) - }) - }) - .transpose()?; - - // Find the typed_value field, if present - let typed_value = inner.column_by_name("typed_value"); - // Note these clones are cheap, they just bump the ref count - let inner = inner.clone(); - let shredding_state = - ShreddingState::try_new(metadata.clone(), value.cloned(), typed_value.cloned())?; - Ok(Self { + inner: inner.clone(), + metadata: metadata.clone(), + shredding_state: ShreddingState::try_new(inner)?, + }) + } + + #[allow(unused)] + pub(crate) fn from_parts( + metadata: BinaryViewArray, + value: Option<BinaryViewArray>, + typed_value: Option<ArrayRef>, + nulls: Option<NullBuffer>, + ) -> Self { + let mut builder = + StructArrayBuilder::new().with_field("metadata", Arc::new(metadata.clone())); + if let Some(value) = value.clone() { + builder = builder.with_field("value", Arc::new(value)); + } + if let Some(typed_value) = typed_value.clone() { + builder = builder.with_field("typed_value", typed_value); + } + if let Some(nulls) = nulls { + builder = builder.with_nulls(nulls); + } + + // This would be a lot simpler if ShreddingState were just a pair of Option... we already + // have everything we need. + let inner = builder.build(); + let shredding_state = ShreddingState::try_new(&inner).unwrap(); // valid by construction Review Comment: Technically not quite "valid by construction" today, because the enum can't handle the (None, None) case yet. But it will, soon enough. ########## parquet-variant-compute/src/variant_get/mod.rs: ########## @@ -15,50 +15,208 @@ // specific language governing permissions and limitations // under the License. use arrow::{ - array::{Array, ArrayRef}, + array::{self, Array, ArrayRef, BinaryViewArray, StructArray}, compute::CastOptions, error::Result, }; -use arrow_schema::{ArrowError, FieldRef}; -use parquet_variant::VariantPath; +use arrow_schema::{ArrowError, DataType, FieldRef}; +use parquet_variant::{VariantPath, VariantPathElement}; use crate::variant_array::ShreddingState; -use crate::variant_get::output::instantiate_output_builder; -use crate::VariantArray; +use crate::{variant_array::ShreddedVariantFieldArray, VariantArray}; + +use std::sync::Arc; mod output; +pub(crate) enum ShreddedPathStep<'a> { + /// Path step succeeded, return the new shredding state + Success(&'a ShreddingState), + /// The path element is not present in the `typed_value` column and there is no `value` column, + /// so we we know it does not exist. It, and all paths under it, are all-NULL. + Missing, + /// The path element is not present in the `typed_value` and must be retrieved from the `value` + /// column instead. The caller should be prepared to handle any value, including the requested + /// type, an arbitrary "wrong" type, or `Variant::Null`. + NotShredded, +} + +/// Given a shredded variant field -- a `(value?, typed_value?)` pair -- try to take one path step +/// deeper. For a `VariantPathElement::Field`, the step fails if there is no `typed_value` at this +/// level, or if `typed_value` is not a struct, or if the requested field name does not exist. +/// +/// TODO: Support `VariantPathElement::Index`? It wouldn't be easy, and maybe not even possible. Review Comment: Problem being, array indexing has to do per-row slicing. I'm not sure what that would look like? Maybe extensive use of view arrays or something? Or just flat-out copying the relevant bytes into new arrays? ########## parquet-variant-compute/src/variant_get/mod.rs: ########## @@ -15,50 +15,208 @@ // specific language governing permissions and limitations // under the License. use arrow::{ - array::{Array, ArrayRef}, + array::{self, Array, ArrayRef, BinaryViewArray, StructArray}, compute::CastOptions, error::Result, }; -use arrow_schema::{ArrowError, FieldRef}; -use parquet_variant::VariantPath; +use arrow_schema::{ArrowError, DataType, FieldRef}; +use parquet_variant::{VariantPath, VariantPathElement}; use crate::variant_array::ShreddingState; -use crate::variant_get::output::instantiate_output_builder; -use crate::VariantArray; +use crate::{variant_array::ShreddedVariantFieldArray, VariantArray}; + +use std::sync::Arc; mod output; +pub(crate) enum ShreddedPathStep<'a> { + /// Path step succeeded, return the new shredding state + Success(&'a ShreddingState), + /// The path element is not present in the `typed_value` column and there is no `value` column, + /// so we we know it does not exist. It, and all paths under it, are all-NULL. + Missing, + /// The path element is not present in the `typed_value` and must be retrieved from the `value` + /// column instead. The caller should be prepared to handle any value, including the requested + /// type, an arbitrary "wrong" type, or `Variant::Null`. + NotShredded, +} + +/// Given a shredded variant field -- a `(value?, typed_value?)` pair -- try to take one path step +/// deeper. For a `VariantPathElement::Field`, the step fails if there is no `typed_value` at this +/// level, or if `typed_value` is not a struct, or if the requested field name does not exist. +/// +/// TODO: Support `VariantPathElement::Index`? It wouldn't be easy, and maybe not even possible. +pub(crate) fn follow_shredded_path_element<'a>( + shredding_state: &'a ShreddingState, + path_element: &VariantPathElement<'_>, +) -> Result<ShreddedPathStep<'a>> { + // If the requested path element 's not present in `typed_value`, and `value` is missing, then Review Comment: ```suggestion // If the requested path element is not present in `typed_value`, and `value` is missing, then ``` ########## parquet-variant-compute/src/variant_get/mod.rs: ########## @@ -388,43 +546,4 @@ mod test { VariantArray::try_new(Arc::new(struct_array)).expect("should create variant array"), ) } - - /// Builds struct arrays from component fields - /// - /// TODO: move to arrow crate - #[derive(Debug, Default, Clone)] - struct StructArrayBuilder { Review Comment: Moved, not deleted. ########## parquet-variant-compute/src/variant_get/mod.rs: ########## @@ -15,50 +15,208 @@ // specific language governing permissions and limitations // under the License. use arrow::{ - array::{Array, ArrayRef}, + array::{self, Array, ArrayRef, BinaryViewArray, StructArray}, compute::CastOptions, error::Result, }; -use arrow_schema::{ArrowError, FieldRef}; -use parquet_variant::VariantPath; +use arrow_schema::{ArrowError, DataType, FieldRef}; +use parquet_variant::{VariantPath, VariantPathElement}; use crate::variant_array::ShreddingState; -use crate::variant_get::output::instantiate_output_builder; -use crate::VariantArray; +use crate::{variant_array::ShreddedVariantFieldArray, VariantArray}; + +use std::sync::Arc; mod output; +pub(crate) enum ShreddedPathStep<'a> { + /// Path step succeeded, return the new shredding state + Success(&'a ShreddingState), + /// The path element is not present in the `typed_value` column and there is no `value` column, + /// so we we know it does not exist. It, and all paths under it, are all-NULL. + Missing, + /// The path element is not present in the `typed_value` and must be retrieved from the `value` + /// column instead. The caller should be prepared to handle any value, including the requested + /// type, an arbitrary "wrong" type, or `Variant::Null`. + NotShredded, +} + +/// Given a shredded variant field -- a `(value?, typed_value?)` pair -- try to take one path step +/// deeper. For a `VariantPathElement::Field`, the step fails if there is no `typed_value` at this +/// level, or if `typed_value` is not a struct, or if the requested field name does not exist. +/// +/// TODO: Support `VariantPathElement::Index`? It wouldn't be easy, and maybe not even possible. +pub(crate) fn follow_shredded_path_element<'a>( + shredding_state: &'a ShreddingState, + path_element: &VariantPathElement<'_>, +) -> Result<ShreddedPathStep<'a>> { + // If the requested path element 's not present in `typed_value`, and `value` is missing, then + // we know it does not exist; it, and all paths under it, are all-NULL. + let missing_path_step = || { + if shredding_state.value_field().is_none() { + ShreddedPathStep::Missing + } else { + ShreddedPathStep::NotShredded + } + }; + + let Some(typed_value) = shredding_state.typed_value_field() else { + return Ok(missing_path_step()); + }; + + match path_element { + VariantPathElement::Field { name } => { + // Try to step into the requested field name of a struct. + let Some(field) = typed_value + .as_any() + .downcast_ref::<StructArray>() + .and_then(|typed_value| typed_value.column_by_name(name)) + else { + return Ok(missing_path_step()); + }; + + let field = field + .as_any() + .downcast_ref::<ShreddedVariantFieldArray>() + .ok_or_else(|| { + // TODO: Should we blow up? Or just end the traversal and let the normal + // variant pathing code sort out the mess that it must anyway be + // prepared to handle? + ArrowError::InvalidArgumentError(format!( + "Expected a ShreddedVariantFieldArray, got {:?} instead", + field.data_type(), + )) + })?; + + Ok(ShreddedPathStep::Success(field.shredding_state())) + } + VariantPathElement::Index { .. } => { + // TODO: Support array indexing. Among other things, it will require slicing not + // only the array we have here, but also the corresponding metadata and null masks. + Err(ArrowError::NotYetImplemented( + "Pathing into shredded variant array index".into(), + )) + } + } +} + +/// Follows the given path as far as possible through shredded variant fields. If the path ends on a +/// shredded field, return it directly. Otherwise, use a row shredder to follow the rest of the path +/// and extract the requested value on a per-row basis. +fn shredded_get_path( + input: &VariantArray, + path: &[VariantPathElement<'_>], + as_type: Option<&DataType>, +) -> Result<ArrayRef> { + // Helper that creates a new VariantArray from the given nested value and typed_value columns, + let make_target_variant = |value: Option<BinaryViewArray>, typed_value: Option<ArrayRef>| { + let metadata = input.metadata_field().clone(); + let nulls = input.inner().nulls().cloned(); + VariantArray::from_parts( + metadata, + value, + typed_value, + nulls, + ) + }; + + // Helper that shreds a VariantArray to a specific type. + let shred_basic_variant = |target: VariantArray, path: VariantPath<'_>, as_type: Option<&DataType>| { + let mut builder = output::struct_output::make_shredding_row_builder(path, as_type)?; + for i in 0..target.len() { + if target.is_null(i) { + builder.append_null()?; + } else { + builder.append_value(&target.value(i))?; + } + } + builder.finish() + }; + + // Peel away the prefix of path elements that traverses the shredded parts of this variant + // column. Shredding will traverse the rest of the path on a per-row basis. + let mut shredding_state = input.shredding_state(); + let mut path_index = 0; + for path_element in path { + match follow_shredded_path_element(shredding_state, path_element)? { + ShreddedPathStep::Success(state) => { + shredding_state = state; + path_index += 1; + continue; + } + ShreddedPathStep::Missing => { + let num_rows = input.len(); + let arr = match as_type { + Some(data_type) => Arc::new(array::new_null_array(data_type, num_rows)) as _, + None => Arc::new(array::NullArray::new(num_rows)) as _, + }; + return Ok(arr); + } + ShreddedPathStep::NotShredded => { + let target = make_target_variant(shredding_state.value_field().cloned(), None); + return shred_basic_variant(target, path[path_index..].into(), as_type); + } + }; + } + + // Path exhausted! Create a new `VariantArray` for the location we landed on. + let target = make_target_variant( + shredding_state.value_field().cloned(), + shredding_state.typed_value_field().cloned(), + ); + + // If our caller did not request any specific type, we can just return whatever we landed on. + let Some(data_type) = as_type else { + return Ok(Arc::new(target)); + }; + + // Structs are special. Recurse into each field separately, hoping to follow the shredding even + // further, and build up the final struct from those individually shredded results. + if let DataType::Struct(fields) = data_type { + let children = fields + .iter() + .map(|field| { + shredded_get_path( + &target, + &[VariantPathElement::from(field.name().as_str())], + Some(field.data_type()), + ) + }) + .collect::<Result<Vec<_>>>()?; + + return Ok(Arc::new(StructArray::try_new( + fields.clone(), + children, + target.nulls().cloned(), Review Comment: Aside: I'm pretty sure this PR is generally _NOT_ fully handling null masks correctly. In particular, I'm pretty sure we need to union the null masks of all ancestor fields when pathing through shredded variant data. Otherwise the resulting projected column will have incomplete null masks. ########## parquet-variant-compute/src/variant_get/output/struct_output.rs: ########## @@ -0,0 +1,371 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ArrayRef, AsArray as _, NullBufferBuilder}; +use arrow::datatypes; +use arrow::datatypes::{ArrowPrimitiveType, FieldRef}; +use arrow::error::{ArrowError, Result}; +use parquet_variant::{Variant, VariantObject, VariantPath}; + +use std::sync::Arc; + +#[allow(unused)] +pub(crate) fn make_shredding_row_builder( + //metadata: &BinaryViewArray, + path: VariantPath<'_>, + data_type: Option<&datatypes::DataType>, +) -> Result<Box<dyn VariantShreddingRowBuilder>> { + todo!() // wire it all up! +} + +/// Builder for shredding variant values into strongly typed Arrow arrays. +/// +/// Useful for variant_get kernels that need to extract specific paths from variant values, possibly +/// with casting of leaf values to specific types. +#[allow(unused)] +pub(crate) trait VariantShreddingRowBuilder { + fn append_null(&mut self) -> Result<()>; + + fn append_value(&mut self, value: &Variant<'_, '_>) -> Result<bool>; + + fn finish(&mut self) -> Result<ArrayRef>; +} + +/// A thin wrapper whose only job is to extract a specific path from a variant value and pass the +/// result to a nested builder. +#[allow(unused)] +struct VariantPathRowBuilder<'a, T: VariantShreddingRowBuilder> { + builder: T, + path: VariantPath<'a>, +} + +impl<T: VariantShreddingRowBuilder> VariantShreddingRowBuilder for VariantPathRowBuilder<'_, T> { + fn append_null(&mut self) -> Result<()> { + self.builder.append_null() + } + + fn append_value(&mut self, value: &Variant<'_, '_>) -> Result<bool> { + if let Some(v) = value.get_path(&self.path) { + self.builder.append_value(&v) + } else { + self.builder.append_null()?; + Ok(false) + } + } + fn finish(&mut self) -> Result<ArrayRef> { + self.builder.finish() + } +} + +/// Helper trait for converting `Variant` values to arrow primitive values. +#[allow(unused)] +trait VariantAsPrimitive<T: ArrowPrimitiveType> { + fn as_primitive(&self) -> Option<T::Native>; +} +impl VariantAsPrimitive<datatypes::Int32Type> for Variant<'_, '_> { + fn as_primitive(&self) -> Option<i32> { + self.as_int32() + } +} +impl VariantAsPrimitive<datatypes::Float64Type> for Variant<'_, '_> { + fn as_primitive(&self) -> Option<f64> { + self.as_f64() + } +} + +/// Builder for shredding variant values to primitive values +#[allow(unused)] +struct PrimitiveVariantShreddingRowBuilder<T: ArrowPrimitiveType> { + builder: arrow::array::PrimitiveBuilder<T>, +} + +impl<T> VariantShreddingRowBuilder for PrimitiveVariantShreddingRowBuilder<T> +where + T: ArrowPrimitiveType, + for<'m, 'v> Variant<'m, 'v>: VariantAsPrimitive<T>, +{ + fn append_null(&mut self) -> Result<()> { + self.builder.append_null(); + Ok(()) + } + + fn append_value(&mut self, value: &Variant<'_, '_>) -> Result<bool> { + if let Some(v) = value.as_primitive() { + self.builder.append_value(v); + Ok(true) + } else { + self.builder.append_null(); // TODO: handle casting failure + Ok(false) + } + } + + fn finish(&mut self) -> Result<ArrayRef> { + Ok(Arc::new(self.builder.finish())) + } +} + +/// Builder for appending raw binary variant values to a BinaryViewArray. It copies the bytes +/// as-is, without any decoding. Review Comment: ```suggestion /// as-is, without any decoding or re-encoding of object fields and array elements. ``` ########## parquet-variant-compute/src/variant_get/output/struct_output.rs: ########## @@ -0,0 +1,371 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ArrayRef, AsArray as _, NullBufferBuilder}; +use arrow::datatypes; +use arrow::datatypes::{ArrowPrimitiveType, FieldRef}; +use arrow::error::{ArrowError, Result}; +use parquet_variant::{Variant, VariantObject, VariantPath}; + +use std::sync::Arc; + +#[allow(unused)] +pub(crate) fn make_shredding_row_builder( + //metadata: &BinaryViewArray, + path: VariantPath<'_>, + data_type: Option<&datatypes::DataType>, +) -> Result<Box<dyn VariantShreddingRowBuilder>> { + todo!() // wire it all up! +} + +/// Builder for shredding variant values into strongly typed Arrow arrays. +/// +/// Useful for variant_get kernels that need to extract specific paths from variant values, possibly +/// with casting of leaf values to specific types. +#[allow(unused)] +pub(crate) trait VariantShreddingRowBuilder { + fn append_null(&mut self) -> Result<()>; + + fn append_value(&mut self, value: &Variant<'_, '_>) -> Result<bool>; + + fn finish(&mut self) -> Result<ArrayRef>; +} + +/// A thin wrapper whose only job is to extract a specific path from a variant value and pass the +/// result to a nested builder. +#[allow(unused)] +struct VariantPathRowBuilder<'a, T: VariantShreddingRowBuilder> { + builder: T, + path: VariantPath<'a>, +} + +impl<T: VariantShreddingRowBuilder> VariantShreddingRowBuilder for VariantPathRowBuilder<'_, T> { + fn append_null(&mut self) -> Result<()> { + self.builder.append_null() + } + + fn append_value(&mut self, value: &Variant<'_, '_>) -> Result<bool> { + if let Some(v) = value.get_path(&self.path) { + self.builder.append_value(&v) + } else { + self.builder.append_null()?; + Ok(false) + } + } + fn finish(&mut self) -> Result<ArrayRef> { + self.builder.finish() + } +} + +/// Helper trait for converting `Variant` values to arrow primitive values. +#[allow(unused)] +trait VariantAsPrimitive<T: ArrowPrimitiveType> { + fn as_primitive(&self) -> Option<T::Native>; +} +impl VariantAsPrimitive<datatypes::Int32Type> for Variant<'_, '_> { + fn as_primitive(&self) -> Option<i32> { + self.as_int32() + } +} +impl VariantAsPrimitive<datatypes::Float64Type> for Variant<'_, '_> { + fn as_primitive(&self) -> Option<f64> { + self.as_f64() + } +} + +/// Builder for shredding variant values to primitive values +#[allow(unused)] +struct PrimitiveVariantShreddingRowBuilder<T: ArrowPrimitiveType> { + builder: arrow::array::PrimitiveBuilder<T>, +} + +impl<T> VariantShreddingRowBuilder for PrimitiveVariantShreddingRowBuilder<T> +where + T: ArrowPrimitiveType, + for<'m, 'v> Variant<'m, 'v>: VariantAsPrimitive<T>, +{ + fn append_null(&mut self) -> Result<()> { + self.builder.append_null(); + Ok(()) + } + + fn append_value(&mut self, value: &Variant<'_, '_>) -> Result<bool> { + if let Some(v) = value.as_primitive() { + self.builder.append_value(v); + Ok(true) + } else { + self.builder.append_null(); // TODO: handle casting failure + Ok(false) + } + } + + fn finish(&mut self) -> Result<ArrayRef> { + Ok(Arc::new(self.builder.finish())) + } +} + +/// Builder for appending raw binary variant values to a BinaryViewArray. It copies the bytes +/// as-is, without any decoding. +#[allow(unused)] +struct BinaryVariantRowBuilder { + nulls: NullBufferBuilder, +} + +impl VariantShreddingRowBuilder for BinaryVariantRowBuilder { + fn append_null(&mut self) -> Result<()> { + self.nulls.append_null(); + Ok(()) + } + fn append_value(&mut self, _value: &Variant<'_, '_>) -> Result<bool> { + // We need a way to convert a Variant directly to bytes. In particular, we want to just copy + // across the underlying value byte slice of a `Variant::Object` or `Variant::List`, without + // any interaction with a `VariantMetadata` (because we will just reuse the existing one). + // + // One could _probably_ emulate this with parquet_variant::VariantBuilder, but it would do a + // lot of unnecessary work and would also create a new metadata column we don't need. Review Comment: Attn @alamb . This feels like a big gap in current functionality, and relates to my previous attempt at * https://github.com/apache/arrow-rs/pull/7915 Tho I'm not convinced that other PR is taking the right approach -- we don't want to "build" complex values at all, but rather just copy over their internal byte slice as-is; primitive values would still have to be re-encoded because they were already decoded as `Variant` in the first place... but the cost there is minimal. ########## parquet-variant/src/path.rs: ########## @@ -109,6 +109,12 @@ impl<'a> From<usize> for VariantPath<'a> { } } +impl<'a> From<&[VariantPathElement<'a>]> for VariantPath<'a> { Review Comment: `IntoIterator` (below) requires values, and iterators to slice references return references. ########## parquet-variant-compute/src/variant_get/output/struct_output.rs: ########## @@ -0,0 +1,371 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ArrayRef, AsArray as _, NullBufferBuilder}; +use arrow::datatypes; +use arrow::datatypes::{ArrowPrimitiveType, FieldRef}; +use arrow::error::{ArrowError, Result}; +use parquet_variant::{Variant, VariantObject, VariantPath}; + +use std::sync::Arc; + +#[allow(unused)] +pub(crate) fn make_shredding_row_builder( + //metadata: &BinaryViewArray, + path: VariantPath<'_>, + data_type: Option<&datatypes::DataType>, +) -> Result<Box<dyn VariantShreddingRowBuilder>> { + todo!() // wire it all up! +} + +/// Builder for shredding variant values into strongly typed Arrow arrays. +/// +/// Useful for variant_get kernels that need to extract specific paths from variant values, possibly +/// with casting of leaf values to specific types. +#[allow(unused)] +pub(crate) trait VariantShreddingRowBuilder { + fn append_null(&mut self) -> Result<()>; + + fn append_value(&mut self, value: &Variant<'_, '_>) -> Result<bool>; + + fn finish(&mut self) -> Result<ArrayRef>; +} + +/// A thin wrapper whose only job is to extract a specific path from a variant value and pass the +/// result to a nested builder. +#[allow(unused)] +struct VariantPathRowBuilder<'a, T: VariantShreddingRowBuilder> { + builder: T, + path: VariantPath<'a>, +} + +impl<T: VariantShreddingRowBuilder> VariantShreddingRowBuilder for VariantPathRowBuilder<'_, T> { + fn append_null(&mut self) -> Result<()> { + self.builder.append_null() + } + + fn append_value(&mut self, value: &Variant<'_, '_>) -> Result<bool> { + if let Some(v) = value.get_path(&self.path) { + self.builder.append_value(&v) + } else { + self.builder.append_null()?; + Ok(false) + } + } + fn finish(&mut self) -> Result<ArrayRef> { + self.builder.finish() + } +} + +/// Helper trait for converting `Variant` values to arrow primitive values. +#[allow(unused)] +trait VariantAsPrimitive<T: ArrowPrimitiveType> { + fn as_primitive(&self) -> Option<T::Native>; +} +impl VariantAsPrimitive<datatypes::Int32Type> for Variant<'_, '_> { + fn as_primitive(&self) -> Option<i32> { + self.as_int32() + } +} +impl VariantAsPrimitive<datatypes::Float64Type> for Variant<'_, '_> { + fn as_primitive(&self) -> Option<f64> { + self.as_f64() + } +} + +/// Builder for shredding variant values to primitive values +#[allow(unused)] +struct PrimitiveVariantShreddingRowBuilder<T: ArrowPrimitiveType> { + builder: arrow::array::PrimitiveBuilder<T>, +} + +impl<T> VariantShreddingRowBuilder for PrimitiveVariantShreddingRowBuilder<T> +where + T: ArrowPrimitiveType, + for<'m, 'v> Variant<'m, 'v>: VariantAsPrimitive<T>, +{ + fn append_null(&mut self) -> Result<()> { + self.builder.append_null(); + Ok(()) + } + + fn append_value(&mut self, value: &Variant<'_, '_>) -> Result<bool> { + if let Some(v) = value.as_primitive() { + self.builder.append_value(v); + Ok(true) + } else { + self.builder.append_null(); // TODO: handle casting failure + Ok(false) + } + } + + fn finish(&mut self) -> Result<ArrayRef> { + Ok(Arc::new(self.builder.finish())) + } +} + +/// Builder for appending raw binary variant values to a BinaryViewArray. It copies the bytes +/// as-is, without any decoding. +#[allow(unused)] +struct BinaryVariantRowBuilder { + nulls: NullBufferBuilder, +} + +impl VariantShreddingRowBuilder for BinaryVariantRowBuilder { + fn append_null(&mut self) -> Result<()> { + self.nulls.append_null(); + Ok(()) + } + fn append_value(&mut self, _value: &Variant<'_, '_>) -> Result<bool> { + // We need a way to convert a Variant directly to bytes. In particular, we want to just copy + // across the underlying value byte slice of a `Variant::Object` or `Variant::List`, without + // any interaction with a `VariantMetadata` (because we will just reuse the existing one). + // + // One could _probably_ emulate this with parquet_variant::VariantBuilder, but it would do a + // lot of unnecessary work and would also create a new metadata column we don't need. + todo!() + } + + fn finish(&mut self) -> Result<ArrayRef> { + // What `finish` does will depend strongly on how `append_value` ends up working. But + // ultimately we'll create and return a `VariantArray` instance. + todo!() + } +} + +/// Builder that extracts a struct. Casting failures produce NULL or error according to options. +#[allow(unused)] +struct StructVariantShreddingRowBuilder { + nulls: NullBufferBuilder, + field_builders: Vec<(FieldRef, Box<dyn VariantShreddingRowBuilder>)>, +} + +impl VariantShreddingRowBuilder for StructVariantShreddingRowBuilder { + fn append_null(&mut self) -> Result<()> { + for (_, builder) in &mut self.field_builders { + builder.append_null()?; + } + self.nulls.append_null(); + Ok(()) + } + + fn append_value(&mut self, value: &Variant<'_, '_>) -> Result<bool> { + // Casting failure if it's not even an object. + let Variant::Object(value) = value else { + // TODO: handle casting failure + self.append_null()?; + return Ok(false); + }; + + // Process each field. If the field is missing, it becomes NULL. If the field is present, + // the child builder handles it from there, and a failed cast could produce NULL or error. + // + // TODO: This loop costs `O(m lg n)` where `m` is the number of fields in this builder and + // `n` is the number of fields in the variant object we're probing. Given that `m` and `n` + // could both be large -- indepentently of each other -- we should consider doing something + // more clever that bounds the cost to O(m + n). + for (field, builder) in &mut self.field_builders { + match value.get(field.name()) { + None => builder.append_null()?, + Some(v) => { + builder.append_value(&v)?; + } + } + } + self.nulls.append_non_null(); + Ok(true) + } + + fn finish(&mut self) -> Result<ArrayRef> { + let mut fields = Vec::with_capacity(self.field_builders.len()); + let mut arrays = Vec::with_capacity(self.field_builders.len()); + for (field, mut builder) in std::mem::take(&mut self.field_builders) { + fields.push(field); + arrays.push(builder.finish()?); + } + Ok(Arc::new(arrow::array::StructArray::try_new( + fields.into(), + arrays, + self.nulls.finish(), + )?)) + } +} + +/// Used for actual shredding of binary variant values into shredded variant values +#[allow(unused)] +struct ShreddedVariantRowBuilder { + metadata: arrow::array::BinaryViewArray, + nulls: NullBufferBuilder, + value_builder: BinaryVariantRowBuilder, + typed_value_builder: Box<dyn VariantShreddingRowBuilder>, +} + +impl VariantShreddingRowBuilder for ShreddedVariantRowBuilder { + fn append_null(&mut self) -> Result<()> { + self.nulls.append_null(); + self.value_builder.append_value(&Variant::Null)?; + self.typed_value_builder.append_null() + } + + fn append_value(&mut self, value: &Variant<'_, '_>) -> Result<bool> { + self.nulls.append_non_null(); + if self.typed_value_builder.append_value(value)? { + // spec: (value: NULL, typed_value: non-NULL => value is present and shredded) + self.value_builder.append_null()?; + } else { + // spec: (value: non-NULL, typed_value: NULL => value is present and unshredded) + self.value_builder.append_value(value)?; + } + Ok(true) + } + + fn finish(&mut self) -> Result<ArrayRef> { + let value = self.value_builder.finish()?; + let Some(value) = value.as_byte_view_opt() else { + return Err(ArrowError::InvalidArgumentError( + "Invalid VariantArray: value builder must produce a BinaryViewArray".to_string(), + )); + }; + Ok(Arc::new(crate::VariantArray::from_parts( + self.metadata.clone(), + Some(value.clone()), // TODO: How to consume an ArrayRef directly? + Some(self.typed_value_builder.finish()?), + self.nulls.finish(), + ))) + } +} + +/// Like VariantShreddingRowBuilder, but for (partially shredded) structs which need special +/// handling on a per-field basis. +#[allow(unused)] +struct VariantShreddingStructRowBuilder { + metadata: arrow::array::BinaryViewArray, + nulls: NullBufferBuilder, + value_builder: BinaryVariantRowBuilder, Review Comment: This is a placeholder... I'm not quite sure what form the value builder (and its corresponding null masks) should actually take? ########## parquet-variant-compute/src/variant_array.rs: ########## @@ -229,107 +328,138 @@ pub enum ShreddingState { // TODO: add missing state where there is neither value nor typed_value // Missing { metadata: BinaryViewArray }, /// This variant has no typed_value field - Unshredded { - metadata: BinaryViewArray, - value: BinaryViewArray, - }, + Unshredded { value: BinaryViewArray }, /// This variant has a typed_value field and no value field /// meaning it is the shredded type - Typed { - metadata: BinaryViewArray, - typed_value: ArrayRef, - }, - /// Partially shredded: - /// * value is an object - /// * typed_value is a shredded object. + PerfectlyShredded { typed_value: ArrayRef }, Review Comment: Variant shredding is very confusion-prone because some aspects of it are column-oriented and others are row-oriented. The notion of "perfect shredding" is mostly columnar: If a column shredded perfectly, then all values are present in the `typed_value` column, and the `value` column is all-NULL (and can be omitted in usual parquet fashion). NOTE: If the shredded type is nullable, then rows with a SQL NULL are encoded as `typed_value=NULL` and `value=Variant::Null` -- not NULL/NULL. This pathfinding PR takes advantage of the columnar nature of shredding, to do pathing through shredded fields at column level (no variant conversions or builders needed). Only if a requested path hits a binary variant field are we forced to drop into row-oriented variant output builder operations). Meanwhile, the "partial shredding" concept hits both columns and rows simultaneously, but with help from the shredding spec to again make it mostly columnar as well: The `typed_value` column must be a struct. Meanwhile, the `value` column could contain any type of variant value (as usual), but with two big extras: 1. If the `value` column contains a variant object, that object _only_ contains fields not present in the shredded struct. So if I asked for `v:x` and `v.typed_value.x` exists, then the spec says I can assume that `v.value:x` is _always_ NULL and I don't even need to look there -- a NULL `v.typed_value.x` means the value is, in fact, SQL NULL. 2. For any row that shredded perfectly (see above), the `value` column is allowed to take a NULL value instead of `Variant::Null`. And if the entire column was perfectly shredded then the (all-NULL) `value` column can be omitted entirely (as is usual for parquet -- readers will just infer all-NULL if that column is requested). So, the hypothetical `ShreddingState::Missing` enum variant would refer to a partially-shredded struct that turned out to shred perfectly. ########## parquet-variant-compute/src/variant_array.rs: ########## @@ -166,23 +180,19 @@ impl VariantArray { /// caller to ensure that the metadata and value were constructed correctly. pub fn value(&self, index: usize) -> Variant<'_, '_> { match &self.shredding_state { - ShreddingState::Unshredded { metadata, value } => { - Variant::new(metadata.value(index), value.value(index)) + ShreddingState::Unshredded { value } => { + Variant::new(self.metadata.value(index), value.value(index)) } - ShreddingState::Typed { typed_value, .. } => { + ShreddingState::PerfectlyShredded { typed_value, .. } => { Review Comment: See commentary below for an explanation of these renamed enum variants. ########## parquet-variant-compute/src/variant_get/mod.rs: ########## @@ -15,50 +15,208 @@ // specific language governing permissions and limitations // under the License. use arrow::{ - array::{Array, ArrayRef}, + array::{self, Array, ArrayRef, BinaryViewArray, StructArray}, compute::CastOptions, error::Result, }; -use arrow_schema::{ArrowError, FieldRef}; -use parquet_variant::VariantPath; +use arrow_schema::{ArrowError, DataType, FieldRef}; +use parquet_variant::{VariantPath, VariantPathElement}; use crate::variant_array::ShreddingState; -use crate::variant_get::output::instantiate_output_builder; -use crate::VariantArray; +use crate::{variant_array::ShreddedVariantFieldArray, VariantArray}; + +use std::sync::Arc; mod output; +pub(crate) enum ShreddedPathStep<'a> { + /// Path step succeeded, return the new shredding state + Success(&'a ShreddingState), + /// The path element is not present in the `typed_value` column and there is no `value` column, + /// so we we know it does not exist. It, and all paths under it, are all-NULL. + Missing, + /// The path element is not present in the `typed_value` and must be retrieved from the `value` + /// column instead. The caller should be prepared to handle any value, including the requested + /// type, an arbitrary "wrong" type, or `Variant::Null`. + NotShredded, +} + +/// Given a shredded variant field -- a `(value?, typed_value?)` pair -- try to take one path step +/// deeper. For a `VariantPathElement::Field`, the step fails if there is no `typed_value` at this +/// level, or if `typed_value` is not a struct, or if the requested field name does not exist. +/// +/// TODO: Support `VariantPathElement::Index`? It wouldn't be easy, and maybe not even possible. +pub(crate) fn follow_shredded_path_element<'a>( + shredding_state: &'a ShreddingState, + path_element: &VariantPathElement<'_>, +) -> Result<ShreddedPathStep<'a>> { + // If the requested path element 's not present in `typed_value`, and `value` is missing, then + // we know it does not exist; it, and all paths under it, are all-NULL. + let missing_path_step = || { + if shredding_state.value_field().is_none() { + ShreddedPathStep::Missing + } else { + ShreddedPathStep::NotShredded + } + }; + + let Some(typed_value) = shredding_state.typed_value_field() else { + return Ok(missing_path_step()); + }; + + match path_element { + VariantPathElement::Field { name } => { + // Try to step into the requested field name of a struct. + let Some(field) = typed_value + .as_any() + .downcast_ref::<StructArray>() + .and_then(|typed_value| typed_value.column_by_name(name)) + else { + return Ok(missing_path_step()); + }; + + let field = field + .as_any() + .downcast_ref::<ShreddedVariantFieldArray>() Review Comment: This is the only place (in this PR, at least) where we step into a shredded field and need to care what arrow Array type it has. But other than validating that it has a valid subset of typed_value and value fields, it's really not much of a thing. Eventually `VariantArray::typed_value_to_variant` will have to care as well, tho. So we do need to figure out what the official approach should be. ########## parquet-variant-compute/src/variant_array.rs: ########## @@ -102,31 +105,42 @@ impl VariantArray { ))); }; - // Find the value field, if present - let value = inner - .column_by_name("value") - .map(|v| { - v.as_binary_view_opt().ok_or_else(|| { - ArrowError::NotYetImplemented(format!( - "VariantArray 'value' field must be BinaryView, got {}", - v.data_type() - )) - }) - }) - .transpose()?; - - // Find the typed_value field, if present - let typed_value = inner.column_by_name("typed_value"); Review Comment: Factored out into the new `VariantArray::from_parts` function below. ########## parquet-variant-compute/src/variant_array.rs: ########## @@ -229,107 +328,138 @@ pub enum ShreddingState { // TODO: add missing state where there is neither value nor typed_value // Missing { metadata: BinaryViewArray }, /// This variant has no typed_value field - Unshredded { - metadata: BinaryViewArray, - value: BinaryViewArray, - }, + Unshredded { value: BinaryViewArray }, /// This variant has a typed_value field and no value field /// meaning it is the shredded type - Typed { - metadata: BinaryViewArray, - typed_value: ArrayRef, - }, - /// Partially shredded: - /// * value is an object - /// * typed_value is a shredded object. + PerfectlyShredded { typed_value: ArrayRef }, + /// Imperfectly shredded: Shredded values reside in `typed_value` while those that failed to + /// shred reside in `value`. Missing field values are NULL in both columns, while NULL primitive + /// values have NULL `typed_value` and `Variant::Null` in `value`. /// - /// Note the spec says "Writers must not produce data where both value and - /// typed_value are non-null, unless the Variant value is an object." - PartiallyShredded { - metadata: BinaryViewArray, + /// NOTE: A partially shredded struct is a special kind of imperfect shredding, where + /// `typed_value` and `value` are both non-NULL. The `typed_value` is a struct containing the + /// subset of fields for which shredding was attempted (each field will then have its own value + /// and/or typed_value sub-fields that indicate how shredding actually turned out). Meanwhile, + /// the `value` is a variant object containing the subset of fields for which shredding was + /// not even attempted. + ImperfectlyShredded { value: BinaryViewArray, typed_value: ArrayRef, }, } impl ShreddingState { /// try to create a new `ShreddingState` from the given fields - pub fn try_new( - metadata: BinaryViewArray, - value: Option<BinaryViewArray>, - typed_value: Option<ArrayRef>, - ) -> Result<Self, ArrowError> { - match (metadata, value, typed_value) { - (metadata, Some(value), Some(typed_value)) => Ok(Self::PartiallyShredded { - metadata, - value, - typed_value, - }), - (metadata, Some(value), None) => Ok(Self::Unshredded { metadata, value }), - (metadata, None, Some(typed_value)) => Ok(Self::Typed { - metadata, - typed_value, - }), - (_metadata_field, None, None) => Err(ArrowError::InvalidArgumentError(String::from( + pub fn try_new(inner: &StructArray) -> Result<Self, ArrowError> { + // Note the specification allows for any order so we must search by name + + // Find the value field, if present + let value = inner + .column_by_name("value") + .map(|v| { + v.as_binary_view_opt().ok_or_else(|| { + ArrowError::NotYetImplemented(format!( + "VariantArray 'value' field must be BinaryView, got {}", + v.data_type() + )) + }) + }) + .transpose()? + .cloned(); + + // Find the typed_value field, if present + let typed_value = inner.column_by_name("typed_value").cloned(); + + match (value, typed_value) { + (Some(value), Some(typed_value)) => { + Ok(Self::ImperfectlyShredded { value, typed_value }) + } + (Some(value), None) => Ok(Self::Unshredded { value }), + (None, Some(typed_value)) => Ok(Self::PerfectlyShredded { typed_value }), + (None, None) => Err(ArrowError::InvalidArgumentError(String::from( "VariantArray has neither value nor typed_value field", ))), } } - /// Return a reference to the metadata field - pub fn metadata_field(&self) -> &BinaryViewArray { - match self { - ShreddingState::Unshredded { metadata, .. } => metadata, - ShreddingState::Typed { metadata, .. } => metadata, - ShreddingState::PartiallyShredded { metadata, .. } => metadata, - } - } - /// Return a reference to the value field, if present pub fn value_field(&self) -> Option<&BinaryViewArray> { match self { ShreddingState::Unshredded { value, .. } => Some(value), - ShreddingState::Typed { .. } => None, - ShreddingState::PartiallyShredded { value, .. } => Some(value), + ShreddingState::PerfectlyShredded { .. } => None, + ShreddingState::ImperfectlyShredded { value, .. } => Some(value), } } /// Return a reference to the typed_value field, if present pub fn typed_value_field(&self) -> Option<&ArrayRef> { match self { ShreddingState::Unshredded { .. } => None, - ShreddingState::Typed { typed_value, .. } => Some(typed_value), - ShreddingState::PartiallyShredded { typed_value, .. } => Some(typed_value), + ShreddingState::PerfectlyShredded { typed_value, .. } => Some(typed_value), + ShreddingState::ImperfectlyShredded { typed_value, .. } => Some(typed_value), } } /// Slice all the underlying arrays pub fn slice(&self, offset: usize, length: usize) -> Self { match self { - ShreddingState::Unshredded { metadata, value } => ShreddingState::Unshredded { - metadata: metadata.slice(offset, length), - value: value.slice(offset, length), - }, - ShreddingState::Typed { - metadata, - typed_value, - } => ShreddingState::Typed { - metadata: metadata.slice(offset, length), - typed_value: typed_value.slice(offset, length), - }, - ShreddingState::PartiallyShredded { - metadata, - value, - typed_value, - } => ShreddingState::PartiallyShredded { - metadata: metadata.slice(offset, length), + ShreddingState::Unshredded { value } => ShreddingState::Unshredded { value: value.slice(offset, length), - typed_value: typed_value.slice(offset, length), }, + ShreddingState::PerfectlyShredded { typed_value } => { + ShreddingState::PerfectlyShredded { + typed_value: typed_value.slice(offset, length), + } + } + ShreddingState::ImperfectlyShredded { value, typed_value } => { + ShreddingState::ImperfectlyShredded { + value: value.slice(offset, length), + typed_value: typed_value.slice(offset, length), + } + } } } } +/// Builds struct arrays from component fields +/// +/// TODO: move to arrow crate +#[derive(Debug, Default, Clone)] +pub struct StructArrayBuilder { Review Comment: This test class was very handy so I stole it. TBD whether/where it should actually live in prod code? ########## parquet-variant-compute/src/variant_array.rs: ########## @@ -192,7 +202,96 @@ impl VariantArray { /// Return a reference to the metadata field of the [`StructArray`] pub fn metadata_field(&self) -> &BinaryViewArray { - self.shredding_state.metadata_field() + &self.metadata + } + + /// Return a reference to the value field of the `StructArray` + pub fn value_field(&self) -> Option<&BinaryViewArray> { + self.shredding_state.value_field() + } + + /// Return a reference to the typed_value field of the `StructArray`, if present + pub fn typed_value_field(&self) -> Option<&ArrayRef> { + self.shredding_state.typed_value_field() + } +} + +/// One shredded field of a partially or prefectly shredded variant. For example, suppose the +/// shredding schema for variant `v` treats it as an object with a single field `a`, where `a` is +/// itself a struct with the single field `b` of type INT. Then the physical layout of the column +/// is: +/// +/// ```text +/// v: VARIANT { +/// metadata: BINARY, +/// value: BINARY, +/// typed_value: STRUCT { +/// a: SHREDDED_VARIANT_FIELD { +/// value: BINARY, +/// typed_value: STRUCT { +/// a: SHREDDED_VARIANT_FIELD { +/// value: BINARY, +/// typed_value: INT, +/// }, +/// }, +/// }, +/// }, +/// } +/// ``` +/// +/// In the above, each row of `v.value` is either a variant value (shredding failed, `v` was not an +/// object at all) or a variant object (partial shredding, `v` was an object but included unexpected +/// fields other than `a`), or is NULL (perfect shredding, `v` was an object containing only the +/// single expected field `a`). +/// +/// A similar story unfolds for each `v.typed_value.a.value` -- a variant value if shredding failed +/// (`v:a` was not an object at all), or a variant object (`v:a` was an object with unexpected +/// additional fields), or NULL (`v:a` was an object containing only the single expected field `b`). +/// +/// Finally, `v.typed_value.a.typed_value.b.value` is either NULL (`v:a.b` was an integer) or else a +/// variant value. +pub struct ShreddedVariantFieldArray { Review Comment: This is a marginally useful new Array that represents a shredded variant field. Unlike the top-level variant, it has only `(value?, typed_value?)` fields -- no metadata. The variant shredding spec mandates this, and I hesitate to "just" reuse `VariantArray` with its `metadata` field. Writing that out to a parquet file would violate the spec, which seems like too big of a footgun and a large departure from typical arrow arrays that physically match the structure of the underlying parquet. I say "marginally useful" because: * Without a `metadata` column, one cannot actually define a `value` method for it. * The pathfinding code in this PR only uses it as a thin container -- all the code is built around the shared `ShreddingState` concept. * There is only one use site, in `follow_shredded_path_element`, so we could work directly with `StructArray` and just validate it manually. ########## parquet-variant-compute/src/variant_get/mod.rs: ########## @@ -15,50 +15,208 @@ // specific language governing permissions and limitations // under the License. use arrow::{ - array::{Array, ArrayRef}, + array::{self, Array, ArrayRef, BinaryViewArray, StructArray}, compute::CastOptions, error::Result, }; -use arrow_schema::{ArrowError, FieldRef}; -use parquet_variant::VariantPath; +use arrow_schema::{ArrowError, DataType, FieldRef}; +use parquet_variant::{VariantPath, VariantPathElement}; use crate::variant_array::ShreddingState; -use crate::variant_get::output::instantiate_output_builder; -use crate::VariantArray; +use crate::{variant_array::ShreddedVariantFieldArray, VariantArray}; + +use std::sync::Arc; mod output; +pub(crate) enum ShreddedPathStep<'a> { + /// Path step succeeded, return the new shredding state + Success(&'a ShreddingState), + /// The path element is not present in the `typed_value` column and there is no `value` column, + /// so we we know it does not exist. It, and all paths under it, are all-NULL. + Missing, + /// The path element is not present in the `typed_value` and must be retrieved from the `value` + /// column instead. The caller should be prepared to handle any value, including the requested + /// type, an arbitrary "wrong" type, or `Variant::Null`. + NotShredded, +} + +/// Given a shredded variant field -- a `(value?, typed_value?)` pair -- try to take one path step +/// deeper. For a `VariantPathElement::Field`, the step fails if there is no `typed_value` at this +/// level, or if `typed_value` is not a struct, or if the requested field name does not exist. +/// +/// TODO: Support `VariantPathElement::Index`? It wouldn't be easy, and maybe not even possible. +pub(crate) fn follow_shredded_path_element<'a>( + shredding_state: &'a ShreddingState, + path_element: &VariantPathElement<'_>, +) -> Result<ShreddedPathStep<'a>> { + // If the requested path element 's not present in `typed_value`, and `value` is missing, then + // we know it does not exist; it, and all paths under it, are all-NULL. + let missing_path_step = || { + if shredding_state.value_field().is_none() { + ShreddedPathStep::Missing + } else { + ShreddedPathStep::NotShredded + } + }; + + let Some(typed_value) = shredding_state.typed_value_field() else { + return Ok(missing_path_step()); + }; + + match path_element { + VariantPathElement::Field { name } => { + // Try to step into the requested field name of a struct. + let Some(field) = typed_value + .as_any() + .downcast_ref::<StructArray>() + .and_then(|typed_value| typed_value.column_by_name(name)) + else { + return Ok(missing_path_step()); + }; + + let field = field + .as_any() + .downcast_ref::<ShreddedVariantFieldArray>() + .ok_or_else(|| { + // TODO: Should we blow up? Or just end the traversal and let the normal + // variant pathing code sort out the mess that it must anyway be + // prepared to handle? + ArrowError::InvalidArgumentError(format!( + "Expected a ShreddedVariantFieldArray, got {:?} instead", + field.data_type(), + )) + })?; + + Ok(ShreddedPathStep::Success(field.shredding_state())) + } + VariantPathElement::Index { .. } => { + // TODO: Support array indexing. Among other things, it will require slicing not + // only the array we have here, but also the corresponding metadata and null masks. + Err(ArrowError::NotYetImplemented( + "Pathing into shredded variant array index".into(), + )) + } + } +} + +/// Follows the given path as far as possible through shredded variant fields. If the path ends on a +/// shredded field, return it directly. Otherwise, use a row shredder to follow the rest of the path +/// and extract the requested value on a per-row basis. +fn shredded_get_path( Review Comment: This is one of the key functions in the whole PR. It works with existing shredded columns as much as possible, descending into actual (binary) variant values only as a last resort. ########## parquet-variant-compute/src/variant_get/mod.rs: ########## @@ -15,50 +15,208 @@ // specific language governing permissions and limitations // under the License. use arrow::{ - array::{Array, ArrayRef}, + array::{self, Array, ArrayRef, BinaryViewArray, StructArray}, compute::CastOptions, error::Result, }; -use arrow_schema::{ArrowError, FieldRef}; -use parquet_variant::VariantPath; +use arrow_schema::{ArrowError, DataType, FieldRef}; +use parquet_variant::{VariantPath, VariantPathElement}; use crate::variant_array::ShreddingState; -use crate::variant_get::output::instantiate_output_builder; -use crate::VariantArray; +use crate::{variant_array::ShreddedVariantFieldArray, VariantArray}; + +use std::sync::Arc; mod output; +pub(crate) enum ShreddedPathStep<'a> { + /// Path step succeeded, return the new shredding state + Success(&'a ShreddingState), + /// The path element is not present in the `typed_value` column and there is no `value` column, + /// so we we know it does not exist. It, and all paths under it, are all-NULL. + Missing, + /// The path element is not present in the `typed_value` and must be retrieved from the `value` + /// column instead. The caller should be prepared to handle any value, including the requested + /// type, an arbitrary "wrong" type, or `Variant::Null`. + NotShredded, +} + +/// Given a shredded variant field -- a `(value?, typed_value?)` pair -- try to take one path step +/// deeper. For a `VariantPathElement::Field`, the step fails if there is no `typed_value` at this +/// level, or if `typed_value` is not a struct, or if the requested field name does not exist. +/// +/// TODO: Support `VariantPathElement::Index`? It wouldn't be easy, and maybe not even possible. +pub(crate) fn follow_shredded_path_element<'a>( + shredding_state: &'a ShreddingState, + path_element: &VariantPathElement<'_>, +) -> Result<ShreddedPathStep<'a>> { + // If the requested path element 's not present in `typed_value`, and `value` is missing, then + // we know it does not exist; it, and all paths under it, are all-NULL. + let missing_path_step = || { + if shredding_state.value_field().is_none() { + ShreddedPathStep::Missing + } else { + ShreddedPathStep::NotShredded + } + }; + + let Some(typed_value) = shredding_state.typed_value_field() else { + return Ok(missing_path_step()); + }; + + match path_element { + VariantPathElement::Field { name } => { + // Try to step into the requested field name of a struct. + let Some(field) = typed_value + .as_any() + .downcast_ref::<StructArray>() + .and_then(|typed_value| typed_value.column_by_name(name)) + else { + return Ok(missing_path_step()); + }; + + let field = field + .as_any() + .downcast_ref::<ShreddedVariantFieldArray>() + .ok_or_else(|| { + // TODO: Should we blow up? Or just end the traversal and let the normal + // variant pathing code sort out the mess that it must anyway be + // prepared to handle? + ArrowError::InvalidArgumentError(format!( + "Expected a ShreddedVariantFieldArray, got {:?} instead", + field.data_type(), + )) + })?; + + Ok(ShreddedPathStep::Success(field.shredding_state())) + } + VariantPathElement::Index { .. } => { + // TODO: Support array indexing. Among other things, it will require slicing not + // only the array we have here, but also the corresponding metadata and null masks. + Err(ArrowError::NotYetImplemented( + "Pathing into shredded variant array index".into(), + )) + } + } +} + +/// Follows the given path as far as possible through shredded variant fields. If the path ends on a +/// shredded field, return it directly. Otherwise, use a row shredder to follow the rest of the path +/// and extract the requested value on a per-row basis. +fn shredded_get_path( + input: &VariantArray, + path: &[VariantPathElement<'_>], + as_type: Option<&DataType>, +) -> Result<ArrayRef> { + // Helper that creates a new VariantArray from the given nested value and typed_value columns, + let make_target_variant = |value: Option<BinaryViewArray>, typed_value: Option<ArrayRef>| { + let metadata = input.metadata_field().clone(); + let nulls = input.inner().nulls().cloned(); + VariantArray::from_parts( + metadata, + value, + typed_value, + nulls, + ) + }; + + // Helper that shreds a VariantArray to a specific type. + let shred_basic_variant = |target: VariantArray, path: VariantPath<'_>, as_type: Option<&DataType>| { + let mut builder = output::struct_output::make_shredding_row_builder(path, as_type)?; Review Comment: This is a new utility added by this PR. See `struct_output.rs` below. A key observation here is that variant shredding (e.g. to write to parquet) and `variant_get` with a user-specified data type are really the same operation. Or rather, `variant_get` _uses_ variant shredding. It also uses variant unshredding, in the form of `VariantArray::typed_value_to_variant`. ########## parquet-variant-compute/src/variant_get/mod.rs: ########## @@ -15,50 +15,208 @@ // specific language governing permissions and limitations // under the License. use arrow::{ - array::{Array, ArrayRef}, + array::{self, Array, ArrayRef, BinaryViewArray, StructArray}, compute::CastOptions, error::Result, }; -use arrow_schema::{ArrowError, FieldRef}; -use parquet_variant::VariantPath; +use arrow_schema::{ArrowError, DataType, FieldRef}; +use parquet_variant::{VariantPath, VariantPathElement}; use crate::variant_array::ShreddingState; -use crate::variant_get::output::instantiate_output_builder; -use crate::VariantArray; +use crate::{variant_array::ShreddedVariantFieldArray, VariantArray}; + +use std::sync::Arc; mod output; +pub(crate) enum ShreddedPathStep<'a> { + /// Path step succeeded, return the new shredding state + Success(&'a ShreddingState), + /// The path element is not present in the `typed_value` column and there is no `value` column, + /// so we we know it does not exist. It, and all paths under it, are all-NULL. + Missing, + /// The path element is not present in the `typed_value` and must be retrieved from the `value` + /// column instead. The caller should be prepared to handle any value, including the requested + /// type, an arbitrary "wrong" type, or `Variant::Null`. + NotShredded, +} + +/// Given a shredded variant field -- a `(value?, typed_value?)` pair -- try to take one path step +/// deeper. For a `VariantPathElement::Field`, the step fails if there is no `typed_value` at this +/// level, or if `typed_value` is not a struct, or if the requested field name does not exist. +/// +/// TODO: Support `VariantPathElement::Index`? It wouldn't be easy, and maybe not even possible. +pub(crate) fn follow_shredded_path_element<'a>( + shredding_state: &'a ShreddingState, + path_element: &VariantPathElement<'_>, +) -> Result<ShreddedPathStep<'a>> { + // If the requested path element 's not present in `typed_value`, and `value` is missing, then + // we know it does not exist; it, and all paths under it, are all-NULL. + let missing_path_step = || { + if shredding_state.value_field().is_none() { + ShreddedPathStep::Missing + } else { + ShreddedPathStep::NotShredded + } + }; + + let Some(typed_value) = shredding_state.typed_value_field() else { + return Ok(missing_path_step()); + }; + + match path_element { + VariantPathElement::Field { name } => { + // Try to step into the requested field name of a struct. + let Some(field) = typed_value + .as_any() + .downcast_ref::<StructArray>() + .and_then(|typed_value| typed_value.column_by_name(name)) + else { + return Ok(missing_path_step()); + }; + + let field = field + .as_any() + .downcast_ref::<ShreddedVariantFieldArray>() + .ok_or_else(|| { + // TODO: Should we blow up? Or just end the traversal and let the normal + // variant pathing code sort out the mess that it must anyway be + // prepared to handle? + ArrowError::InvalidArgumentError(format!( + "Expected a ShreddedVariantFieldArray, got {:?} instead", + field.data_type(), + )) + })?; + + Ok(ShreddedPathStep::Success(field.shredding_state())) + } + VariantPathElement::Index { .. } => { + // TODO: Support array indexing. Among other things, it will require slicing not + // only the array we have here, but also the corresponding metadata and null masks. + Err(ArrowError::NotYetImplemented( + "Pathing into shredded variant array index".into(), + )) + } + } +} + +/// Follows the given path as far as possible through shredded variant fields. If the path ends on a +/// shredded field, return it directly. Otherwise, use a row shredder to follow the rest of the path +/// and extract the requested value on a per-row basis. +fn shredded_get_path( + input: &VariantArray, + path: &[VariantPathElement<'_>], + as_type: Option<&DataType>, +) -> Result<ArrayRef> { + // Helper that creates a new VariantArray from the given nested value and typed_value columns, + let make_target_variant = |value: Option<BinaryViewArray>, typed_value: Option<ArrayRef>| { + let metadata = input.metadata_field().clone(); + let nulls = input.inner().nulls().cloned(); + VariantArray::from_parts( + metadata, + value, + typed_value, + nulls, + ) + }; + + // Helper that shreds a VariantArray to a specific type. + let shred_basic_variant = |target: VariantArray, path: VariantPath<'_>, as_type: Option<&DataType>| { + let mut builder = output::struct_output::make_shredding_row_builder(path, as_type)?; + for i in 0..target.len() { + if target.is_null(i) { + builder.append_null()?; + } else { + builder.append_value(&target.value(i))?; + } + } + builder.finish() + }; + + // Peel away the prefix of path elements that traverses the shredded parts of this variant + // column. Shredding will traverse the rest of the path on a per-row basis. + let mut shredding_state = input.shredding_state(); + let mut path_index = 0; + for path_element in path { + match follow_shredded_path_element(shredding_state, path_element)? { + ShreddedPathStep::Success(state) => { + shredding_state = state; + path_index += 1; + continue; Review Comment: redundant... ```suggestion ``` ########## parquet-variant-compute/src/variant_get/output/struct_output.rs: ########## @@ -0,0 +1,371 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ArrayRef, AsArray as _, NullBufferBuilder}; +use arrow::datatypes; +use arrow::datatypes::{ArrowPrimitiveType, FieldRef}; +use arrow::error::{ArrowError, Result}; +use parquet_variant::{Variant, VariantObject, VariantPath}; + +use std::sync::Arc; + +#[allow(unused)] +pub(crate) fn make_shredding_row_builder( + //metadata: &BinaryViewArray, + path: VariantPath<'_>, + data_type: Option<&datatypes::DataType>, +) -> Result<Box<dyn VariantShreddingRowBuilder>> { + todo!() // wire it all up! Review Comment: I _think_ this will just be a lot of annoying boilerplate to instantiate and connect the various row builders defined below... but I haven't actually tried so there could be additional challenges. ########## parquet-variant-compute/src/variant_get/mod.rs: ########## @@ -15,50 +15,208 @@ // specific language governing permissions and limitations // under the License. use arrow::{ - array::{Array, ArrayRef}, + array::{self, Array, ArrayRef, BinaryViewArray, StructArray}, compute::CastOptions, error::Result, }; -use arrow_schema::{ArrowError, FieldRef}; -use parquet_variant::VariantPath; +use arrow_schema::{ArrowError, DataType, FieldRef}; +use parquet_variant::{VariantPath, VariantPathElement}; use crate::variant_array::ShreddingState; -use crate::variant_get::output::instantiate_output_builder; -use crate::VariantArray; +use crate::{variant_array::ShreddedVariantFieldArray, VariantArray}; + +use std::sync::Arc; mod output; +pub(crate) enum ShreddedPathStep<'a> { + /// Path step succeeded, return the new shredding state + Success(&'a ShreddingState), + /// The path element is not present in the `typed_value` column and there is no `value` column, + /// so we we know it does not exist. It, and all paths under it, are all-NULL. + Missing, + /// The path element is not present in the `typed_value` and must be retrieved from the `value` + /// column instead. The caller should be prepared to handle any value, including the requested + /// type, an arbitrary "wrong" type, or `Variant::Null`. + NotShredded, +} + +/// Given a shredded variant field -- a `(value?, typed_value?)` pair -- try to take one path step +/// deeper. For a `VariantPathElement::Field`, the step fails if there is no `typed_value` at this +/// level, or if `typed_value` is not a struct, or if the requested field name does not exist. +/// +/// TODO: Support `VariantPathElement::Index`? It wouldn't be easy, and maybe not even possible. +pub(crate) fn follow_shredded_path_element<'a>( + shredding_state: &'a ShreddingState, + path_element: &VariantPathElement<'_>, +) -> Result<ShreddedPathStep<'a>> { + // If the requested path element 's not present in `typed_value`, and `value` is missing, then + // we know it does not exist; it, and all paths under it, are all-NULL. + let missing_path_step = || { + if shredding_state.value_field().is_none() { + ShreddedPathStep::Missing + } else { + ShreddedPathStep::NotShredded + } + }; + + let Some(typed_value) = shredding_state.typed_value_field() else { + return Ok(missing_path_step()); + }; + + match path_element { + VariantPathElement::Field { name } => { + // Try to step into the requested field name of a struct. + let Some(field) = typed_value + .as_any() + .downcast_ref::<StructArray>() + .and_then(|typed_value| typed_value.column_by_name(name)) + else { + return Ok(missing_path_step()); + }; + + let field = field + .as_any() + .downcast_ref::<ShreddedVariantFieldArray>() + .ok_or_else(|| { + // TODO: Should we blow up? Or just end the traversal and let the normal + // variant pathing code sort out the mess that it must anyway be + // prepared to handle? + ArrowError::InvalidArgumentError(format!( + "Expected a ShreddedVariantFieldArray, got {:?} instead", + field.data_type(), + )) + })?; + + Ok(ShreddedPathStep::Success(field.shredding_state())) + } + VariantPathElement::Index { .. } => { + // TODO: Support array indexing. Among other things, it will require slicing not + // only the array we have here, but also the corresponding metadata and null masks. + Err(ArrowError::NotYetImplemented( + "Pathing into shredded variant array index".into(), + )) + } + } +} + +/// Follows the given path as far as possible through shredded variant fields. If the path ends on a +/// shredded field, return it directly. Otherwise, use a row shredder to follow the rest of the path +/// and extract the requested value on a per-row basis. +fn shredded_get_path( + input: &VariantArray, + path: &[VariantPathElement<'_>], + as_type: Option<&DataType>, +) -> Result<ArrayRef> { + // Helper that creates a new VariantArray from the given nested value and typed_value columns, + let make_target_variant = |value: Option<BinaryViewArray>, typed_value: Option<ArrayRef>| { + let metadata = input.metadata_field().clone(); + let nulls = input.inner().nulls().cloned(); + VariantArray::from_parts( + metadata, + value, + typed_value, + nulls, + ) + }; + + // Helper that shreds a VariantArray to a specific type. + let shred_basic_variant = |target: VariantArray, path: VariantPath<'_>, as_type: Option<&DataType>| { + let mut builder = output::struct_output::make_shredding_row_builder(path, as_type)?; + for i in 0..target.len() { + if target.is_null(i) { + builder.append_null()?; + } else { + builder.append_value(&target.value(i))?; + } + } + builder.finish() + }; + + // Peel away the prefix of path elements that traverses the shredded parts of this variant + // column. Shredding will traverse the rest of the path on a per-row basis. + let mut shredding_state = input.shredding_state(); + let mut path_index = 0; + for path_element in path { + match follow_shredded_path_element(shredding_state, path_element)? { + ShreddedPathStep::Success(state) => { + shredding_state = state; + path_index += 1; + continue; + } + ShreddedPathStep::Missing => { + let num_rows = input.len(); + let arr = match as_type { + Some(data_type) => Arc::new(array::new_null_array(data_type, num_rows)) as _, + None => Arc::new(array::NullArray::new(num_rows)) as _, + }; + return Ok(arr); + } + ShreddedPathStep::NotShredded => { + let target = make_target_variant(shredding_state.value_field().cloned(), None); + return shred_basic_variant(target, path[path_index..].into(), as_type); + } + }; + } + + // Path exhausted! Create a new `VariantArray` for the location we landed on. + let target = make_target_variant( + shredding_state.value_field().cloned(), + shredding_state.typed_value_field().cloned(), + ); + + // If our caller did not request any specific type, we can just return whatever we landed on. + let Some(data_type) = as_type else { + return Ok(Arc::new(target)); + }; + + // Structs are special. Recurse into each field separately, hoping to follow the shredding even + // further, and build up the final struct from those individually shredded results. + if let DataType::Struct(fields) = data_type { + let children = fields + .iter() + .map(|field| { + shredded_get_path( + &target, + &[VariantPathElement::from(field.name().as_str())], + Some(field.data_type()), + ) + }) + .collect::<Result<Vec<_>>>()?; + + return Ok(Arc::new(StructArray::try_new( + fields.clone(), + children, + target.nulls().cloned(), + )?)); + } + + // Not a struct, so directly shred the variant as the requested type + shred_basic_variant(target, VariantPath::default(), as_type) +} + /// Returns an array with the specified path extracted from the variant values. /// /// The return array type depends on the `as_type` field of the options parameter /// 1. `as_type: None`: a VariantArray is returned. The values in this new VariantArray will point /// to the specified path. /// 2. `as_type: Some(<specific field>)`: an array of the specified type is returned. +/// +/// TODO: How would a caller request a struct or list type where the fields/elements can be any +/// variant? Caller can pass None as the requested type to fetch a specific path, but it would +/// quickly become annoying (and inefficient) to call `variant_get` for each leaf value in a struct or +/// list and then try to assemble the results. pub fn variant_get(input: &ArrayRef, options: GetOptions) -> Result<ArrayRef> { let variant_array: &VariantArray = input.as_any().downcast_ref().ok_or_else(|| { ArrowError::InvalidArgumentError( "expected a VariantArray as the input for variant_get".to_owned(), ) })?; - // Create the output writer based on the specified output options - let output_builder = instantiate_output_builder(options.clone())?; + let GetOptions { as_type, path, .. } = &options; Review Comment: NOTE: This PR makes no attempt whatsoever to handle options that control casting semantics or errors. ########## parquet-variant-compute/src/variant_get/mod.rs: ########## @@ -15,50 +15,208 @@ // specific language governing permissions and limitations // under the License. use arrow::{ - array::{Array, ArrayRef}, + array::{self, Array, ArrayRef, BinaryViewArray, StructArray}, compute::CastOptions, error::Result, }; -use arrow_schema::{ArrowError, FieldRef}; -use parquet_variant::VariantPath; +use arrow_schema::{ArrowError, DataType, FieldRef}; +use parquet_variant::{VariantPath, VariantPathElement}; use crate::variant_array::ShreddingState; -use crate::variant_get::output::instantiate_output_builder; -use crate::VariantArray; +use crate::{variant_array::ShreddedVariantFieldArray, VariantArray}; + +use std::sync::Arc; mod output; +pub(crate) enum ShreddedPathStep<'a> { + /// Path step succeeded, return the new shredding state + Success(&'a ShreddingState), + /// The path element is not present in the `typed_value` column and there is no `value` column, + /// so we we know it does not exist. It, and all paths under it, are all-NULL. + Missing, + /// The path element is not present in the `typed_value` and must be retrieved from the `value` + /// column instead. The caller should be prepared to handle any value, including the requested + /// type, an arbitrary "wrong" type, or `Variant::Null`. + NotShredded, +} + +/// Given a shredded variant field -- a `(value?, typed_value?)` pair -- try to take one path step +/// deeper. For a `VariantPathElement::Field`, the step fails if there is no `typed_value` at this +/// level, or if `typed_value` is not a struct, or if the requested field name does not exist. +/// +/// TODO: Support `VariantPathElement::Index`? It wouldn't be easy, and maybe not even possible. +pub(crate) fn follow_shredded_path_element<'a>( + shredding_state: &'a ShreddingState, + path_element: &VariantPathElement<'_>, +) -> Result<ShreddedPathStep<'a>> { + // If the requested path element 's not present in `typed_value`, and `value` is missing, then + // we know it does not exist; it, and all paths under it, are all-NULL. + let missing_path_step = || { + if shredding_state.value_field().is_none() { + ShreddedPathStep::Missing + } else { + ShreddedPathStep::NotShredded + } + }; + + let Some(typed_value) = shredding_state.typed_value_field() else { + return Ok(missing_path_step()); + }; + + match path_element { + VariantPathElement::Field { name } => { + // Try to step into the requested field name of a struct. + let Some(field) = typed_value + .as_any() + .downcast_ref::<StructArray>() + .and_then(|typed_value| typed_value.column_by_name(name)) + else { + return Ok(missing_path_step()); + }; + + let field = field + .as_any() + .downcast_ref::<ShreddedVariantFieldArray>() + .ok_or_else(|| { + // TODO: Should we blow up? Or just end the traversal and let the normal + // variant pathing code sort out the mess that it must anyway be + // prepared to handle? + ArrowError::InvalidArgumentError(format!( + "Expected a ShreddedVariantFieldArray, got {:?} instead", + field.data_type(), + )) + })?; + + Ok(ShreddedPathStep::Success(field.shredding_state())) + } + VariantPathElement::Index { .. } => { + // TODO: Support array indexing. Among other things, it will require slicing not + // only the array we have here, but also the corresponding metadata and null masks. + Err(ArrowError::NotYetImplemented( + "Pathing into shredded variant array index".into(), + )) + } + } +} + +/// Follows the given path as far as possible through shredded variant fields. If the path ends on a +/// shredded field, return it directly. Otherwise, use a row shredder to follow the rest of the path +/// and extract the requested value on a per-row basis. +fn shredded_get_path( + input: &VariantArray, + path: &[VariantPathElement<'_>], + as_type: Option<&DataType>, +) -> Result<ArrayRef> { + // Helper that creates a new VariantArray from the given nested value and typed_value columns, + let make_target_variant = |value: Option<BinaryViewArray>, typed_value: Option<ArrayRef>| { + let metadata = input.metadata_field().clone(); + let nulls = input.inner().nulls().cloned(); + VariantArray::from_parts( + metadata, + value, + typed_value, + nulls, + ) + }; + + // Helper that shreds a VariantArray to a specific type. + let shred_basic_variant = |target: VariantArray, path: VariantPath<'_>, as_type: Option<&DataType>| { + let mut builder = output::struct_output::make_shredding_row_builder(path, as_type)?; + for i in 0..target.len() { + if target.is_null(i) { + builder.append_null()?; + } else { + builder.append_value(&target.value(i))?; + } + } + builder.finish() + }; + + // Peel away the prefix of path elements that traverses the shredded parts of this variant + // column. Shredding will traverse the rest of the path on a per-row basis. + let mut shredding_state = input.shredding_state(); + let mut path_index = 0; + for path_element in path { + match follow_shredded_path_element(shredding_state, path_element)? { + ShreddedPathStep::Success(state) => { + shredding_state = state; + path_index += 1; + continue; + } + ShreddedPathStep::Missing => { + let num_rows = input.len(); + let arr = match as_type { + Some(data_type) => Arc::new(array::new_null_array(data_type, num_rows)) as _, + None => Arc::new(array::NullArray::new(num_rows)) as _, + }; + return Ok(arr); + } + ShreddedPathStep::NotShredded => { + let target = make_target_variant(shredding_state.value_field().cloned(), None); + return shred_basic_variant(target, path[path_index..].into(), as_type); + } + }; + } + + // Path exhausted! Create a new `VariantArray` for the location we landed on. + let target = make_target_variant( + shredding_state.value_field().cloned(), + shredding_state.typed_value_field().cloned(), + ); + + // If our caller did not request any specific type, we can just return whatever we landed on. + let Some(data_type) = as_type else { + return Ok(Arc::new(target)); + }; + + // Structs are special. Recurse into each field separately, hoping to follow the shredding even + // further, and build up the final struct from those individually shredded results. + if let DataType::Struct(fields) = data_type { + let children = fields + .iter() + .map(|field| { + shredded_get_path( + &target, + &[VariantPathElement::from(field.name().as_str())], + Some(field.data_type()), + ) + }) + .collect::<Result<Vec<_>>>()?; + + return Ok(Arc::new(StructArray::try_new( + fields.clone(), + children, + target.nulls().cloned(), + )?)); + } + + // Not a struct, so directly shred the variant as the requested type + shred_basic_variant(target, VariantPath::default(), as_type) +} + /// Returns an array with the specified path extracted from the variant values. /// /// The return array type depends on the `as_type` field of the options parameter /// 1. `as_type: None`: a VariantArray is returned. The values in this new VariantArray will point /// to the specified path. /// 2. `as_type: Some(<specific field>)`: an array of the specified type is returned. +/// +/// TODO: How would a caller request a struct or list type where the fields/elements can be any +/// variant? Caller can pass None as the requested type to fetch a specific path, but it would +/// quickly become annoying (and inefficient) to call `variant_get` for each leaf value in a struct or +/// list and then try to assemble the results. Review Comment: This is a big open question in my mind. I'd hate to have to (un)shred to a specific type just because there's no way to say "give me what you've got"... only to cast/shred _again_. The need arises, for example, when trying to deserialize a free-form JSON field. My code may have a specific schema it likes to work with, but if I cast directly to that schema the cast could fail (or impose unwanted casting semantics). But I also don't want to fetch 10k unwanted fields that a given variant object might contain, so I want to specify a struct with specific leaf fields of interest to project out. ########## parquet-variant-compute/src/variant_get/output/struct_output.rs: ########## Review Comment: This is almost certainly the wrong place for this code. And it shouldn't all be in one file, either -- actually implementing all of the functionality sketched out here would probably sprawl over several sub-modules. ########## parquet-variant-compute/src/variant_get/mod.rs: ########## @@ -15,50 +15,208 @@ // specific language governing permissions and limitations // under the License. use arrow::{ - array::{Array, ArrayRef}, + array::{self, Array, ArrayRef, BinaryViewArray, StructArray}, compute::CastOptions, error::Result, }; -use arrow_schema::{ArrowError, FieldRef}; -use parquet_variant::VariantPath; +use arrow_schema::{ArrowError, DataType, FieldRef}; +use parquet_variant::{VariantPath, VariantPathElement}; use crate::variant_array::ShreddingState; -use crate::variant_get::output::instantiate_output_builder; -use crate::VariantArray; +use crate::{variant_array::ShreddedVariantFieldArray, VariantArray}; + +use std::sync::Arc; mod output; +pub(crate) enum ShreddedPathStep<'a> { + /// Path step succeeded, return the new shredding state + Success(&'a ShreddingState), + /// The path element is not present in the `typed_value` column and there is no `value` column, + /// so we we know it does not exist. It, and all paths under it, are all-NULL. + Missing, + /// The path element is not present in the `typed_value` and must be retrieved from the `value` + /// column instead. The caller should be prepared to handle any value, including the requested + /// type, an arbitrary "wrong" type, or `Variant::Null`. + NotShredded, +} + +/// Given a shredded variant field -- a `(value?, typed_value?)` pair -- try to take one path step +/// deeper. For a `VariantPathElement::Field`, the step fails if there is no `typed_value` at this +/// level, or if `typed_value` is not a struct, or if the requested field name does not exist. +/// +/// TODO: Support `VariantPathElement::Index`? It wouldn't be easy, and maybe not even possible. +pub(crate) fn follow_shredded_path_element<'a>( + shredding_state: &'a ShreddingState, + path_element: &VariantPathElement<'_>, +) -> Result<ShreddedPathStep<'a>> { + // If the requested path element 's not present in `typed_value`, and `value` is missing, then + // we know it does not exist; it, and all paths under it, are all-NULL. + let missing_path_step = || { + if shredding_state.value_field().is_none() { + ShreddedPathStep::Missing + } else { + ShreddedPathStep::NotShredded + } + }; + + let Some(typed_value) = shredding_state.typed_value_field() else { + return Ok(missing_path_step()); + }; + + match path_element { + VariantPathElement::Field { name } => { + // Try to step into the requested field name of a struct. + let Some(field) = typed_value + .as_any() + .downcast_ref::<StructArray>() + .and_then(|typed_value| typed_value.column_by_name(name)) + else { + return Ok(missing_path_step()); + }; + + let field = field + .as_any() + .downcast_ref::<ShreddedVariantFieldArray>() + .ok_or_else(|| { + // TODO: Should we blow up? Or just end the traversal and let the normal + // variant pathing code sort out the mess that it must anyway be + // prepared to handle? + ArrowError::InvalidArgumentError(format!( + "Expected a ShreddedVariantFieldArray, got {:?} instead", + field.data_type(), + )) + })?; + + Ok(ShreddedPathStep::Success(field.shredding_state())) + } + VariantPathElement::Index { .. } => { + // TODO: Support array indexing. Among other things, it will require slicing not + // only the array we have here, but also the corresponding metadata and null masks. + Err(ArrowError::NotYetImplemented( + "Pathing into shredded variant array index".into(), + )) + } + } +} + +/// Follows the given path as far as possible through shredded variant fields. If the path ends on a +/// shredded field, return it directly. Otherwise, use a row shredder to follow the rest of the path +/// and extract the requested value on a per-row basis. +fn shredded_get_path( + input: &VariantArray, + path: &[VariantPathElement<'_>], + as_type: Option<&DataType>, +) -> Result<ArrayRef> { + // Helper that creates a new VariantArray from the given nested value and typed_value columns, + let make_target_variant = |value: Option<BinaryViewArray>, typed_value: Option<ArrayRef>| { + let metadata = input.metadata_field().clone(); + let nulls = input.inner().nulls().cloned(); + VariantArray::from_parts( + metadata, + value, + typed_value, + nulls, + ) + }; + + // Helper that shreds a VariantArray to a specific type. + let shred_basic_variant = |target: VariantArray, path: VariantPath<'_>, as_type: Option<&DataType>| { + let mut builder = output::struct_output::make_shredding_row_builder(path, as_type)?; + for i in 0..target.len() { + if target.is_null(i) { + builder.append_null()?; + } else { + builder.append_value(&target.value(i))?; + } + } + builder.finish() + }; + + // Peel away the prefix of path elements that traverses the shredded parts of this variant + // column. Shredding will traverse the rest of the path on a per-row basis. + let mut shredding_state = input.shredding_state(); + let mut path_index = 0; + for path_element in path { + match follow_shredded_path_element(shredding_state, path_element)? { + ShreddedPathStep::Success(state) => { + shredding_state = state; + path_index += 1; + continue; + } + ShreddedPathStep::Missing => { + let num_rows = input.len(); + let arr = match as_type { + Some(data_type) => Arc::new(array::new_null_array(data_type, num_rows)) as _, + None => Arc::new(array::NullArray::new(num_rows)) as _, + }; + return Ok(arr); + } + ShreddedPathStep::NotShredded => { + let target = make_target_variant(shredding_state.value_field().cloned(), None); + return shred_basic_variant(target, path[path_index..].into(), as_type); + } + }; + } + + // Path exhausted! Create a new `VariantArray` for the location we landed on. Review Comment: ```suggestion // Path exhausted! Create a new `VariantArray` for the target location we landed on. ``` ########## parquet-variant-compute/src/variant_get/output/struct_output.rs: ########## @@ -0,0 +1,371 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ArrayRef, AsArray as _, NullBufferBuilder}; +use arrow::datatypes; +use arrow::datatypes::{ArrowPrimitiveType, FieldRef}; +use arrow::error::{ArrowError, Result}; +use parquet_variant::{Variant, VariantObject, VariantPath}; + +use std::sync::Arc; + +#[allow(unused)] +pub(crate) fn make_shredding_row_builder( + //metadata: &BinaryViewArray, + path: VariantPath<'_>, + data_type: Option<&datatypes::DataType>, +) -> Result<Box<dyn VariantShreddingRowBuilder>> { + todo!() // wire it all up! +} + +/// Builder for shredding variant values into strongly typed Arrow arrays. +/// +/// Useful for variant_get kernels that need to extract specific paths from variant values, possibly +/// with casting of leaf values to specific types. +#[allow(unused)] +pub(crate) trait VariantShreddingRowBuilder { + fn append_null(&mut self) -> Result<()>; + + fn append_value(&mut self, value: &Variant<'_, '_>) -> Result<bool>; Review Comment: This method returns `Ok(false)` in case shredding failed and the cast was not configured to raise an error. Useful for actual shredding, where the outcome of `typed_column` append decides what we append to the `value` column. See `ShreddedVariantRowBuilder::append_value` below for an example. ########## parquet-variant-compute/src/variant_get/output/struct_output.rs: ########## @@ -0,0 +1,371 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ArrayRef, AsArray as _, NullBufferBuilder}; +use arrow::datatypes; +use arrow::datatypes::{ArrowPrimitiveType, FieldRef}; +use arrow::error::{ArrowError, Result}; +use parquet_variant::{Variant, VariantObject, VariantPath}; + +use std::sync::Arc; + +#[allow(unused)] +pub(crate) fn make_shredding_row_builder( + //metadata: &BinaryViewArray, + path: VariantPath<'_>, + data_type: Option<&datatypes::DataType>, +) -> Result<Box<dyn VariantShreddingRowBuilder>> { + todo!() // wire it all up! +} + +/// Builder for shredding variant values into strongly typed Arrow arrays. +/// +/// Useful for variant_get kernels that need to extract specific paths from variant values, possibly +/// with casting of leaf values to specific types. +#[allow(unused)] +pub(crate) trait VariantShreddingRowBuilder { + fn append_null(&mut self) -> Result<()>; + + fn append_value(&mut self, value: &Variant<'_, '_>) -> Result<bool>; + + fn finish(&mut self) -> Result<ArrayRef>; +} Review Comment: This is another key piece of the PR's pathfinding -- a row-oriented shredding trait that takes variant values in and eventually produces a strongly-typed array of some kind. That output array could be anything from a primitive array (perfect shredding) to a shredded primitive (value, typed_value) pairs to a struct of shredded fields to a binary variant. ########## parquet-variant-compute/src/variant_get/mod.rs: ########## @@ -15,50 +15,208 @@ // specific language governing permissions and limitations // under the License. use arrow::{ - array::{Array, ArrayRef}, + array::{self, Array, ArrayRef, BinaryViewArray, StructArray}, compute::CastOptions, error::Result, }; -use arrow_schema::{ArrowError, FieldRef}; -use parquet_variant::VariantPath; +use arrow_schema::{ArrowError, DataType, FieldRef}; +use parquet_variant::{VariantPath, VariantPathElement}; use crate::variant_array::ShreddingState; -use crate::variant_get::output::instantiate_output_builder; -use crate::VariantArray; +use crate::{variant_array::ShreddedVariantFieldArray, VariantArray}; + +use std::sync::Arc; mod output; +pub(crate) enum ShreddedPathStep<'a> { + /// Path step succeeded, return the new shredding state + Success(&'a ShreddingState), + /// The path element is not present in the `typed_value` column and there is no `value` column, + /// so we we know it does not exist. It, and all paths under it, are all-NULL. + Missing, + /// The path element is not present in the `typed_value` and must be retrieved from the `value` + /// column instead. The caller should be prepared to handle any value, including the requested + /// type, an arbitrary "wrong" type, or `Variant::Null`. + NotShredded, +} + +/// Given a shredded variant field -- a `(value?, typed_value?)` pair -- try to take one path step +/// deeper. For a `VariantPathElement::Field`, the step fails if there is no `typed_value` at this +/// level, or if `typed_value` is not a struct, or if the requested field name does not exist. +/// +/// TODO: Support `VariantPathElement::Index`? It wouldn't be easy, and maybe not even possible. +pub(crate) fn follow_shredded_path_element<'a>( + shredding_state: &'a ShreddingState, + path_element: &VariantPathElement<'_>, +) -> Result<ShreddedPathStep<'a>> { + // If the requested path element 's not present in `typed_value`, and `value` is missing, then + // we know it does not exist; it, and all paths under it, are all-NULL. + let missing_path_step = || { + if shredding_state.value_field().is_none() { + ShreddedPathStep::Missing + } else { + ShreddedPathStep::NotShredded + } + }; + + let Some(typed_value) = shredding_state.typed_value_field() else { + return Ok(missing_path_step()); + }; + + match path_element { + VariantPathElement::Field { name } => { + // Try to step into the requested field name of a struct. + let Some(field) = typed_value + .as_any() + .downcast_ref::<StructArray>() + .and_then(|typed_value| typed_value.column_by_name(name)) + else { + return Ok(missing_path_step()); + }; + + let field = field + .as_any() + .downcast_ref::<ShreddedVariantFieldArray>() + .ok_or_else(|| { + // TODO: Should we blow up? Or just end the traversal and let the normal + // variant pathing code sort out the mess that it must anyway be + // prepared to handle? + ArrowError::InvalidArgumentError(format!( + "Expected a ShreddedVariantFieldArray, got {:?} instead", + field.data_type(), + )) + })?; + + Ok(ShreddedPathStep::Success(field.shredding_state())) + } + VariantPathElement::Index { .. } => { + // TODO: Support array indexing. Among other things, it will require slicing not + // only the array we have here, but also the corresponding metadata and null masks. + Err(ArrowError::NotYetImplemented( + "Pathing into shredded variant array index".into(), + )) + } + } +} + +/// Follows the given path as far as possible through shredded variant fields. If the path ends on a +/// shredded field, return it directly. Otherwise, use a row shredder to follow the rest of the path +/// and extract the requested value on a per-row basis. +fn shredded_get_path( + input: &VariantArray, + path: &[VariantPathElement<'_>], + as_type: Option<&DataType>, +) -> Result<ArrayRef> { + // Helper that creates a new VariantArray from the given nested value and typed_value columns, + let make_target_variant = |value: Option<BinaryViewArray>, typed_value: Option<ArrayRef>| { + let metadata = input.metadata_field().clone(); + let nulls = input.inner().nulls().cloned(); + VariantArray::from_parts( + metadata, + value, + typed_value, + nulls, + ) + }; + + // Helper that shreds a VariantArray to a specific type. + let shred_basic_variant = |target: VariantArray, path: VariantPath<'_>, as_type: Option<&DataType>| { + let mut builder = output::struct_output::make_shredding_row_builder(path, as_type)?; + for i in 0..target.len() { + if target.is_null(i) { + builder.append_null()?; + } else { + builder.append_value(&target.value(i))?; + } + } + builder.finish() + }; + + // Peel away the prefix of path elements that traverses the shredded parts of this variant + // column. Shredding will traverse the rest of the path on a per-row basis. + let mut shredding_state = input.shredding_state(); + let mut path_index = 0; + for path_element in path { + match follow_shredded_path_element(shredding_state, path_element)? { + ShreddedPathStep::Success(state) => { + shredding_state = state; + path_index += 1; + continue; + } + ShreddedPathStep::Missing => { + let num_rows = input.len(); + let arr = match as_type { + Some(data_type) => Arc::new(array::new_null_array(data_type, num_rows)) as _, + None => Arc::new(array::NullArray::new(num_rows)) as _, + }; + return Ok(arr); + } + ShreddedPathStep::NotShredded => { + let target = make_target_variant(shredding_state.value_field().cloned(), None); + return shred_basic_variant(target, path[path_index..].into(), as_type); + } + }; + } + + // Path exhausted! Create a new `VariantArray` for the location we landed on. + let target = make_target_variant( + shredding_state.value_field().cloned(), + shredding_state.typed_value_field().cloned(), + ); + + // If our caller did not request any specific type, we can just return whatever we landed on. + let Some(data_type) = as_type else { + return Ok(Arc::new(target)); + }; + + // Structs are special. Recurse into each field separately, hoping to follow the shredding even + // further, and build up the final struct from those individually shredded results. + if let DataType::Struct(fields) = data_type { + let children = fields + .iter() + .map(|field| { + shredded_get_path( + &target, + &[VariantPathElement::from(field.name().as_str())], + Some(field.data_type()), + ) + }) + .collect::<Result<Vec<_>>>()?; + + return Ok(Arc::new(StructArray::try_new( + fields.clone(), + children, + target.nulls().cloned(), + )?)); + } + + // Not a struct, so directly shred the variant as the requested type + shred_basic_variant(target, VariantPath::default(), as_type) +} + /// Returns an array with the specified path extracted from the variant values. /// /// The return array type depends on the `as_type` field of the options parameter /// 1. `as_type: None`: a VariantArray is returned. The values in this new VariantArray will point /// to the specified path. /// 2. `as_type: Some(<specific field>)`: an array of the specified type is returned. +/// +/// TODO: How would a caller request a struct or list type where the fields/elements can be any +/// variant? Caller can pass None as the requested type to fetch a specific path, but it would +/// quickly become annoying (and inefficient) to call `variant_get` for each leaf value in a struct or +/// list and then try to assemble the results. pub fn variant_get(input: &ArrayRef, options: GetOptions) -> Result<ArrayRef> { let variant_array: &VariantArray = input.as_any().downcast_ref().ok_or_else(|| { ArrowError::InvalidArgumentError( "expected a VariantArray as the input for variant_get".to_owned(), ) })?; - // Create the output writer based on the specified output options - let output_builder = instantiate_output_builder(options.clone())?; Review Comment: This PR completely ignores the existing start at output builders. I'm not sure if I didn't understand how they should work, but they didn't seem to fit what I was trying to explore. TBD how to harmonize/reconcile this. ########## parquet-variant-compute/src/variant_get/output/struct_output.rs: ########## @@ -0,0 +1,371 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ArrayRef, AsArray as _, NullBufferBuilder}; +use arrow::datatypes; +use arrow::datatypes::{ArrowPrimitiveType, FieldRef}; +use arrow::error::{ArrowError, Result}; +use parquet_variant::{Variant, VariantObject, VariantPath}; + +use std::sync::Arc; + +#[allow(unused)] +pub(crate) fn make_shredding_row_builder( + //metadata: &BinaryViewArray, + path: VariantPath<'_>, + data_type: Option<&datatypes::DataType>, +) -> Result<Box<dyn VariantShreddingRowBuilder>> { + todo!() // wire it all up! +} + +/// Builder for shredding variant values into strongly typed Arrow arrays. +/// +/// Useful for variant_get kernels that need to extract specific paths from variant values, possibly +/// with casting of leaf values to specific types. +#[allow(unused)] +pub(crate) trait VariantShreddingRowBuilder { + fn append_null(&mut self) -> Result<()>; + + fn append_value(&mut self, value: &Variant<'_, '_>) -> Result<bool>; + + fn finish(&mut self) -> Result<ArrayRef>; +} + +/// A thin wrapper whose only job is to extract a specific path from a variant value and pass the +/// result to a nested builder. +#[allow(unused)] +struct VariantPathRowBuilder<'a, T: VariantShreddingRowBuilder> { + builder: T, + path: VariantPath<'a>, +} + +impl<T: VariantShreddingRowBuilder> VariantShreddingRowBuilder for VariantPathRowBuilder<'_, T> { + fn append_null(&mut self) -> Result<()> { + self.builder.append_null() + } + + fn append_value(&mut self, value: &Variant<'_, '_>) -> Result<bool> { + if let Some(v) = value.get_path(&self.path) { + self.builder.append_value(&v) + } else { + self.builder.append_null()?; + Ok(false) + } + } + fn finish(&mut self) -> Result<ArrayRef> { + self.builder.finish() + } +} + +/// Helper trait for converting `Variant` values to arrow primitive values. +#[allow(unused)] +trait VariantAsPrimitive<T: ArrowPrimitiveType> { + fn as_primitive(&self) -> Option<T::Native>; +} +impl VariantAsPrimitive<datatypes::Int32Type> for Variant<'_, '_> { + fn as_primitive(&self) -> Option<i32> { + self.as_int32() + } +} +impl VariantAsPrimitive<datatypes::Float64Type> for Variant<'_, '_> { + fn as_primitive(&self) -> Option<f64> { + self.as_f64() + } +} + +/// Builder for shredding variant values to primitive values +#[allow(unused)] +struct PrimitiveVariantShreddingRowBuilder<T: ArrowPrimitiveType> { + builder: arrow::array::PrimitiveBuilder<T>, +} + +impl<T> VariantShreddingRowBuilder for PrimitiveVariantShreddingRowBuilder<T> +where + T: ArrowPrimitiveType, + for<'m, 'v> Variant<'m, 'v>: VariantAsPrimitive<T>, +{ + fn append_null(&mut self) -> Result<()> { + self.builder.append_null(); + Ok(()) + } + + fn append_value(&mut self, value: &Variant<'_, '_>) -> Result<bool> { + if let Some(v) = value.as_primitive() { + self.builder.append_value(v); + Ok(true) + } else { + self.builder.append_null(); // TODO: handle casting failure + Ok(false) + } + } + + fn finish(&mut self) -> Result<ArrayRef> { + Ok(Arc::new(self.builder.finish())) + } +} + +/// Builder for appending raw binary variant values to a BinaryViewArray. It copies the bytes +/// as-is, without any decoding. +#[allow(unused)] +struct BinaryVariantRowBuilder { + nulls: NullBufferBuilder, +} + +impl VariantShreddingRowBuilder for BinaryVariantRowBuilder { + fn append_null(&mut self) -> Result<()> { + self.nulls.append_null(); + Ok(()) + } + fn append_value(&mut self, _value: &Variant<'_, '_>) -> Result<bool> { + // We need a way to convert a Variant directly to bytes. In particular, we want to just copy + // across the underlying value byte slice of a `Variant::Object` or `Variant::List`, without + // any interaction with a `VariantMetadata` (because we will just reuse the existing one). + // + // One could _probably_ emulate this with parquet_variant::VariantBuilder, but it would do a + // lot of unnecessary work and would also create a new metadata column we don't need. + todo!() + } + + fn finish(&mut self) -> Result<ArrayRef> { + // What `finish` does will depend strongly on how `append_value` ends up working. But + // ultimately we'll create and return a `VariantArray` instance. + todo!() + } +} + +/// Builder that extracts a struct. Casting failures produce NULL or error according to options. +#[allow(unused)] +struct StructVariantShreddingRowBuilder { Review Comment: Terrible name, but I couldn't think of a better one. Even worse name below for partially shredded structs... ########## parquet-variant-compute/src/variant_get/output/struct_output.rs: ########## @@ -0,0 +1,371 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ArrayRef, AsArray as _, NullBufferBuilder}; +use arrow::datatypes; +use arrow::datatypes::{ArrowPrimitiveType, FieldRef}; +use arrow::error::{ArrowError, Result}; +use parquet_variant::{Variant, VariantObject, VariantPath}; + +use std::sync::Arc; + +#[allow(unused)] +pub(crate) fn make_shredding_row_builder( + //metadata: &BinaryViewArray, + path: VariantPath<'_>, + data_type: Option<&datatypes::DataType>, +) -> Result<Box<dyn VariantShreddingRowBuilder>> { + todo!() // wire it all up! +} + +/// Builder for shredding variant values into strongly typed Arrow arrays. +/// +/// Useful for variant_get kernels that need to extract specific paths from variant values, possibly +/// with casting of leaf values to specific types. +#[allow(unused)] +pub(crate) trait VariantShreddingRowBuilder { + fn append_null(&mut self) -> Result<()>; + + fn append_value(&mut self, value: &Variant<'_, '_>) -> Result<bool>; + + fn finish(&mut self) -> Result<ArrayRef>; +} + +/// A thin wrapper whose only job is to extract a specific path from a variant value and pass the +/// result to a nested builder. +#[allow(unused)] +struct VariantPathRowBuilder<'a, T: VariantShreddingRowBuilder> { + builder: T, + path: VariantPath<'a>, +} + +impl<T: VariantShreddingRowBuilder> VariantShreddingRowBuilder for VariantPathRowBuilder<'_, T> { + fn append_null(&mut self) -> Result<()> { + self.builder.append_null() + } + + fn append_value(&mut self, value: &Variant<'_, '_>) -> Result<bool> { + if let Some(v) = value.get_path(&self.path) { + self.builder.append_value(&v) + } else { + self.builder.append_null()?; + Ok(false) + } + } + fn finish(&mut self) -> Result<ArrayRef> { + self.builder.finish() + } +} + +/// Helper trait for converting `Variant` values to arrow primitive values. +#[allow(unused)] +trait VariantAsPrimitive<T: ArrowPrimitiveType> { + fn as_primitive(&self) -> Option<T::Native>; +} +impl VariantAsPrimitive<datatypes::Int32Type> for Variant<'_, '_> { + fn as_primitive(&self) -> Option<i32> { + self.as_int32() + } +} +impl VariantAsPrimitive<datatypes::Float64Type> for Variant<'_, '_> { + fn as_primitive(&self) -> Option<f64> { + self.as_f64() + } +} + +/// Builder for shredding variant values to primitive values +#[allow(unused)] +struct PrimitiveVariantShreddingRowBuilder<T: ArrowPrimitiveType> { + builder: arrow::array::PrimitiveBuilder<T>, +} + +impl<T> VariantShreddingRowBuilder for PrimitiveVariantShreddingRowBuilder<T> +where + T: ArrowPrimitiveType, + for<'m, 'v> Variant<'m, 'v>: VariantAsPrimitive<T>, +{ + fn append_null(&mut self) -> Result<()> { + self.builder.append_null(); + Ok(()) + } + + fn append_value(&mut self, value: &Variant<'_, '_>) -> Result<bool> { + if let Some(v) = value.as_primitive() { + self.builder.append_value(v); + Ok(true) + } else { + self.builder.append_null(); // TODO: handle casting failure + Ok(false) + } + } + + fn finish(&mut self) -> Result<ArrayRef> { + Ok(Arc::new(self.builder.finish())) + } +} + +/// Builder for appending raw binary variant values to a BinaryViewArray. It copies the bytes +/// as-is, without any decoding. +#[allow(unused)] +struct BinaryVariantRowBuilder { + nulls: NullBufferBuilder, +} + +impl VariantShreddingRowBuilder for BinaryVariantRowBuilder { + fn append_null(&mut self) -> Result<()> { + self.nulls.append_null(); + Ok(()) + } + fn append_value(&mut self, _value: &Variant<'_, '_>) -> Result<bool> { + // We need a way to convert a Variant directly to bytes. In particular, we want to just copy + // across the underlying value byte slice of a `Variant::Object` or `Variant::List`, without + // any interaction with a `VariantMetadata` (because we will just reuse the existing one). + // + // One could _probably_ emulate this with parquet_variant::VariantBuilder, but it would do a + // lot of unnecessary work and would also create a new metadata column we don't need. + todo!() + } + + fn finish(&mut self) -> Result<ArrayRef> { + // What `finish` does will depend strongly on how `append_value` ends up working. But + // ultimately we'll create and return a `VariantArray` instance. + todo!() + } +} + +/// Builder that extracts a struct. Casting failures produce NULL or error according to options. +#[allow(unused)] +struct StructVariantShreddingRowBuilder { + nulls: NullBufferBuilder, + field_builders: Vec<(FieldRef, Box<dyn VariantShreddingRowBuilder>)>, +} + +impl VariantShreddingRowBuilder for StructVariantShreddingRowBuilder { + fn append_null(&mut self) -> Result<()> { + for (_, builder) in &mut self.field_builders { + builder.append_null()?; + } + self.nulls.append_null(); + Ok(()) + } + + fn append_value(&mut self, value: &Variant<'_, '_>) -> Result<bool> { + // Casting failure if it's not even an object. + let Variant::Object(value) = value else { + // TODO: handle casting failure + self.append_null()?; + return Ok(false); + }; + + // Process each field. If the field is missing, it becomes NULL. If the field is present, + // the child builder handles it from there, and a failed cast could produce NULL or error. + // + // TODO: This loop costs `O(m lg n)` where `m` is the number of fields in this builder and + // `n` is the number of fields in the variant object we're probing. Given that `m` and `n` + // could both be large -- indepentently of each other -- we should consider doing something + // more clever that bounds the cost to O(m + n). + for (field, builder) in &mut self.field_builders { + match value.get(field.name()) { + None => builder.append_null()?, + Some(v) => { + builder.append_value(&v)?; + } + } + } + self.nulls.append_non_null(); + Ok(true) + } + + fn finish(&mut self) -> Result<ArrayRef> { + let mut fields = Vec::with_capacity(self.field_builders.len()); + let mut arrays = Vec::with_capacity(self.field_builders.len()); + for (field, mut builder) in std::mem::take(&mut self.field_builders) { + fields.push(field); + arrays.push(builder.finish()?); + } + Ok(Arc::new(arrow::array::StructArray::try_new( + fields.into(), + arrays, + self.nulls.finish(), + )?)) + } +} + +/// Used for actual shredding of binary variant values into shredded variant values +#[allow(unused)] +struct ShreddedVariantRowBuilder { + metadata: arrow::array::BinaryViewArray, + nulls: NullBufferBuilder, + value_builder: BinaryVariantRowBuilder, + typed_value_builder: Box<dyn VariantShreddingRowBuilder>, +} + +impl VariantShreddingRowBuilder for ShreddedVariantRowBuilder { + fn append_null(&mut self) -> Result<()> { + self.nulls.append_null(); + self.value_builder.append_value(&Variant::Null)?; + self.typed_value_builder.append_null() + } + + fn append_value(&mut self, value: &Variant<'_, '_>) -> Result<bool> { + self.nulls.append_non_null(); + if self.typed_value_builder.append_value(value)? { + // spec: (value: NULL, typed_value: non-NULL => value is present and shredded) + self.value_builder.append_null()?; + } else { + // spec: (value: non-NULL, typed_value: NULL => value is present and unshredded) + self.value_builder.append_value(value)?; + } + Ok(true) + } + + fn finish(&mut self) -> Result<ArrayRef> { + let value = self.value_builder.finish()?; + let Some(value) = value.as_byte_view_opt() else { + return Err(ArrowError::InvalidArgumentError( + "Invalid VariantArray: value builder must produce a BinaryViewArray".to_string(), + )); + }; + Ok(Arc::new(crate::VariantArray::from_parts( + self.metadata.clone(), + Some(value.clone()), // TODO: How to consume an ArrayRef directly? + Some(self.typed_value_builder.finish()?), + self.nulls.finish(), + ))) + } +} + +/// Like VariantShreddingRowBuilder, but for (partially shredded) structs which need special +/// handling on a per-field basis. +#[allow(unused)] +struct VariantShreddingStructRowBuilder { + metadata: arrow::array::BinaryViewArray, + nulls: NullBufferBuilder, + value_builder: BinaryVariantRowBuilder, + typed_value_field_builders: Vec<(FieldRef, Box<dyn VariantShreddingRowBuilder>)>, + typed_value_nulls: NullBufferBuilder, +} + +#[allow(unused)] +impl VariantShreddingStructRowBuilder { + fn append_null_typed_value(&mut self) -> Result<()> { + for (_, builder) in &mut self.typed_value_field_builders { + builder.append_null()?; + } + self.typed_value_nulls.append_null(); + Ok(()) + } + + // Co-iterate over all fields of both the input value object and the target `typed_value` + // schema, effectively performing full outer merge join by field name. + // + // NOTE: At most one of the two options can be empty. + fn merge_join_fields<'a>( + &'a mut self, + _value: &'a VariantObject<'a, 'a>, + ) -> impl Iterator< + Item = ( + Option<&'a Variant<'a, 'a>>, + &'a str, + Option<&'a mut dyn VariantShreddingRowBuilder>, + ), + > { + std::iter::empty() Review Comment: ```suggestion std::iter::empty() // TODO ``` ########## parquet-variant-compute/src/variant_get/output/struct_output.rs: ########## @@ -0,0 +1,371 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ArrayRef, AsArray as _, NullBufferBuilder}; +use arrow::datatypes; +use arrow::datatypes::{ArrowPrimitiveType, FieldRef}; +use arrow::error::{ArrowError, Result}; +use parquet_variant::{Variant, VariantObject, VariantPath}; + +use std::sync::Arc; + +#[allow(unused)] +pub(crate) fn make_shredding_row_builder( + //metadata: &BinaryViewArray, + path: VariantPath<'_>, + data_type: Option<&datatypes::DataType>, +) -> Result<Box<dyn VariantShreddingRowBuilder>> { + todo!() // wire it all up! +} + +/// Builder for shredding variant values into strongly typed Arrow arrays. +/// +/// Useful for variant_get kernels that need to extract specific paths from variant values, possibly +/// with casting of leaf values to specific types. +#[allow(unused)] +pub(crate) trait VariantShreddingRowBuilder { + fn append_null(&mut self) -> Result<()>; + + fn append_value(&mut self, value: &Variant<'_, '_>) -> Result<bool>; + + fn finish(&mut self) -> Result<ArrayRef>; +} + +/// A thin wrapper whose only job is to extract a specific path from a variant value and pass the +/// result to a nested builder. +#[allow(unused)] +struct VariantPathRowBuilder<'a, T: VariantShreddingRowBuilder> { + builder: T, + path: VariantPath<'a>, +} + +impl<T: VariantShreddingRowBuilder> VariantShreddingRowBuilder for VariantPathRowBuilder<'_, T> { + fn append_null(&mut self) -> Result<()> { + self.builder.append_null() + } + + fn append_value(&mut self, value: &Variant<'_, '_>) -> Result<bool> { + if let Some(v) = value.get_path(&self.path) { + self.builder.append_value(&v) + } else { + self.builder.append_null()?; + Ok(false) + } + } + fn finish(&mut self) -> Result<ArrayRef> { + self.builder.finish() + } +} + +/// Helper trait for converting `Variant` values to arrow primitive values. +#[allow(unused)] +trait VariantAsPrimitive<T: ArrowPrimitiveType> { + fn as_primitive(&self) -> Option<T::Native>; +} +impl VariantAsPrimitive<datatypes::Int32Type> for Variant<'_, '_> { + fn as_primitive(&self) -> Option<i32> { + self.as_int32() + } +} +impl VariantAsPrimitive<datatypes::Float64Type> for Variant<'_, '_> { + fn as_primitive(&self) -> Option<f64> { + self.as_f64() + } +} + +/// Builder for shredding variant values to primitive values +#[allow(unused)] +struct PrimitiveVariantShreddingRowBuilder<T: ArrowPrimitiveType> { + builder: arrow::array::PrimitiveBuilder<T>, +} + +impl<T> VariantShreddingRowBuilder for PrimitiveVariantShreddingRowBuilder<T> +where + T: ArrowPrimitiveType, + for<'m, 'v> Variant<'m, 'v>: VariantAsPrimitive<T>, +{ + fn append_null(&mut self) -> Result<()> { + self.builder.append_null(); + Ok(()) + } + + fn append_value(&mut self, value: &Variant<'_, '_>) -> Result<bool> { + if let Some(v) = value.as_primitive() { + self.builder.append_value(v); + Ok(true) + } else { + self.builder.append_null(); // TODO: handle casting failure + Ok(false) + } + } + + fn finish(&mut self) -> Result<ArrayRef> { + Ok(Arc::new(self.builder.finish())) + } +} + +/// Builder for appending raw binary variant values to a BinaryViewArray. It copies the bytes +/// as-is, without any decoding. +#[allow(unused)] +struct BinaryVariantRowBuilder { + nulls: NullBufferBuilder, +} + +impl VariantShreddingRowBuilder for BinaryVariantRowBuilder { + fn append_null(&mut self) -> Result<()> { + self.nulls.append_null(); + Ok(()) + } + fn append_value(&mut self, _value: &Variant<'_, '_>) -> Result<bool> { + // We need a way to convert a Variant directly to bytes. In particular, we want to just copy + // across the underlying value byte slice of a `Variant::Object` or `Variant::List`, without + // any interaction with a `VariantMetadata` (because we will just reuse the existing one). + // + // One could _probably_ emulate this with parquet_variant::VariantBuilder, but it would do a + // lot of unnecessary work and would also create a new metadata column we don't need. + todo!() + } + + fn finish(&mut self) -> Result<ArrayRef> { + // What `finish` does will depend strongly on how `append_value` ends up working. But + // ultimately we'll create and return a `VariantArray` instance. + todo!() + } +} + +/// Builder that extracts a struct. Casting failures produce NULL or error according to options. +#[allow(unused)] +struct StructVariantShreddingRowBuilder { + nulls: NullBufferBuilder, + field_builders: Vec<(FieldRef, Box<dyn VariantShreddingRowBuilder>)>, +} + +impl VariantShreddingRowBuilder for StructVariantShreddingRowBuilder { + fn append_null(&mut self) -> Result<()> { + for (_, builder) in &mut self.field_builders { + builder.append_null()?; + } + self.nulls.append_null(); + Ok(()) + } + + fn append_value(&mut self, value: &Variant<'_, '_>) -> Result<bool> { + // Casting failure if it's not even an object. + let Variant::Object(value) = value else { + // TODO: handle casting failure + self.append_null()?; + return Ok(false); + }; + + // Process each field. If the field is missing, it becomes NULL. If the field is present, + // the child builder handles it from there, and a failed cast could produce NULL or error. + // + // TODO: This loop costs `O(m lg n)` where `m` is the number of fields in this builder and + // `n` is the number of fields in the variant object we're probing. Given that `m` and `n` + // could both be large -- indepentently of each other -- we should consider doing something + // more clever that bounds the cost to O(m + n). + for (field, builder) in &mut self.field_builders { + match value.get(field.name()) { + None => builder.append_null()?, + Some(v) => { + builder.append_value(&v)?; + } + } + } + self.nulls.append_non_null(); + Ok(true) + } + + fn finish(&mut self) -> Result<ArrayRef> { + let mut fields = Vec::with_capacity(self.field_builders.len()); + let mut arrays = Vec::with_capacity(self.field_builders.len()); + for (field, mut builder) in std::mem::take(&mut self.field_builders) { + fields.push(field); + arrays.push(builder.finish()?); + } + Ok(Arc::new(arrow::array::StructArray::try_new( + fields.into(), + arrays, + self.nulls.finish(), + )?)) + } +} + +/// Used for actual shredding of binary variant values into shredded variant values +#[allow(unused)] +struct ShreddedVariantRowBuilder { Review Comment: This implementation is very incomplete, but hopefully has enough of a sketch to give an idea of what the implementation should look like? Solving it requires a way of throwing around variant bytes, so e.g. we can iterate over a stream of variant fields and filter out the unwanted ones to build a new variant object. ########## parquet-variant-compute/src/variant_array.rs: ########## @@ -229,107 +328,138 @@ pub enum ShreddingState { // TODO: add missing state where there is neither value nor typed_value // Missing { metadata: BinaryViewArray }, /// This variant has no typed_value field - Unshredded { - metadata: BinaryViewArray, - value: BinaryViewArray, - }, + Unshredded { value: BinaryViewArray }, /// This variant has a typed_value field and no value field /// meaning it is the shredded type - Typed { - metadata: BinaryViewArray, - typed_value: ArrayRef, - }, - /// Partially shredded: - /// * value is an object - /// * typed_value is a shredded object. + PerfectlyShredded { typed_value: ArrayRef }, Review Comment: Actually, NULL handling seems to be a bit more complicated than that... If it's a top-level variant, e.g. `v::INT`, then `(value=NULL, typed_value=NULL)` is an illegal combination. A SQL NULL value should be encoded as`(value=Variant::Null, typed_value=NULL)`: <img width="551" height="161" alt="image" src="https://github.com/user-attachments/assets/14687b81-6b0a-4794-aa1a-decffe2c8ada" /> But if it's a nested variant field, e.g. `v:x::INT`, then `(value=NULL, typed_value=NULL)` is the correct way to encode a SQL NULL ("missing value") and `(value=Variant::Null, typed_value=NULL)` encodes a `Variant::Null` value (for which SQL `IS NULL` returns `FALSE`): <img width="665" height="86" alt="image" src="https://github.com/user-attachments/assets/ee9f026d-10fa-4be6-a3cc-b25c6f06c3bc" /> Not sure if it could be considered correct to always use the top-level encoding, and accept that missing nested fields come back as `Variant::Null` instead of SQL NULL? Probably not, given that this really follows the structure of JSON -- at top level, you can do `{}` (empty object) or `""` (empty string literal) or `null` (JSON null is present and therefore not SQL NULL). There's no way to encode the notion of a SQL NULL JSON literal in JSON itself. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org