alamb commented on code in PR #8481: URL: https://github.com/apache/arrow-rs/pull/8481#discussion_r2392628784
########## parquet-variant-compute/src/unshred_variant.rs: ########## @@ -0,0 +1,520 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Module for unshredding VariantArray by folding typed_value columns back into the value column. + +use crate::{BorrowedShreddingState, VariantArray, VariantValueArrayBuilder}; +use arrow::array::{ + Array, AsArray as _, BinaryViewArray, BooleanArray, FixedSizeBinaryArray, NullBufferBuilder, + PrimitiveArray, StringArray, StructArray, +}; +use arrow::buffer::NullBuffer; +use arrow::datatypes::{ + ArrowPrimitiveType, DataType, Date32Type, Float32Type, Float64Type, Int16Type, Int32Type, + Int64Type, Int8Type, Time64MicrosecondType, TimeUnit, TimestampMicrosecondType, + TimestampNanosecondType, +}; +use arrow::error::{ArrowError, Result}; +use arrow::temporal_conversions::time64us_to_time; +use chrono::{DateTime, Utc}; +use indexmap::IndexMap; +use parquet_variant::{ObjectFieldBuilder, Variant, VariantBuilderExt, VariantMetadata}; +use uuid::Uuid; + +/// Removes all (nested) typed_value columns from a VariantArray by converting them back to binary +/// variant and merging the resulting values back into the value column. +/// +/// This function efficiently converts a shredded VariantArray back to an unshredded form where all +/// data resides in the value column. +/// +/// # Arguments +/// * `array` - The VariantArray to unshred +/// +/// # Returns +/// A new VariantArray with all data in the value column and no typed_value column +/// +/// # Errors +/// - If the shredded data contains spec violations (e.g., field name conflicts) +/// - If unsupported data types are encountered in typed_value columns +pub fn unshred_variant(array: &VariantArray) -> Result<VariantArray> { + // Check if already unshredded (optimization for common case) + if array.typed_value_field().is_none() && array.value_field().is_some() { + return Ok(array.clone()); + } + + // NOTE: None/None at top-level is technically invalid, but the shredding spec requires us to + // emit `Variant::Null` when a required value is missing. + let mut row_builder = make_unshred_variant_row_builder(array.shredding_state().borrow())? + .unwrap_or_else(|| UnshredVariantRowBuilder::null(array.nulls())); + + let metadata = array.metadata_field(); + let mut value_builder = VariantValueArrayBuilder::new(array.len()); + let mut null_builder = NullBufferBuilder::new(array.len()); + for i in 0..array.len() { + if array.is_null(i) { + value_builder.append_null(); + null_builder.append_null(); Review Comment: as a follow on PR/ optimization, I think we can just clone`array.null_buffer()` rather than rebuilding up the same buffer again row by row via `null_builder` ########## parquet-variant-compute/src/unshred_variant.rs: ########## @@ -0,0 +1,520 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Module for unshredding VariantArray by folding typed_value columns back into the value column. + +use crate::{BorrowedShreddingState, VariantArray, VariantValueArrayBuilder}; +use arrow::array::{ + Array, AsArray as _, BinaryViewArray, BooleanArray, FixedSizeBinaryArray, NullBufferBuilder, + PrimitiveArray, StringArray, StructArray, +}; +use arrow::buffer::NullBuffer; +use arrow::datatypes::{ + ArrowPrimitiveType, DataType, Date32Type, Float32Type, Float64Type, Int16Type, Int32Type, + Int64Type, Int8Type, Time64MicrosecondType, TimeUnit, TimestampMicrosecondType, + TimestampNanosecondType, +}; +use arrow::error::{ArrowError, Result}; +use arrow::temporal_conversions::time64us_to_time; +use chrono::{DateTime, Utc}; +use indexmap::IndexMap; +use parquet_variant::{ObjectFieldBuilder, Variant, VariantBuilderExt, VariantMetadata}; +use uuid::Uuid; + +/// Removes all (nested) typed_value columns from a VariantArray by converting them back to binary +/// variant and merging the resulting values back into the value column. +/// +/// This function efficiently converts a shredded VariantArray back to an unshredded form where all +/// data resides in the value column. +/// +/// # Arguments +/// * `array` - The VariantArray to unshred +/// +/// # Returns +/// A new VariantArray with all data in the value column and no typed_value column +/// +/// # Errors +/// - If the shredded data contains spec violations (e.g., field name conflicts) +/// - If unsupported data types are encountered in typed_value columns +pub fn unshred_variant(array: &VariantArray) -> Result<VariantArray> { + // Check if already unshredded (optimization for common case) + if array.typed_value_field().is_none() && array.value_field().is_some() { + return Ok(array.clone()); + } + + // NOTE: None/None at top-level is technically invalid, but the shredding spec requires us to + // emit `Variant::Null` when a required value is missing. + let mut row_builder = make_unshred_variant_row_builder(array.shredding_state().borrow())? + .unwrap_or_else(|| UnshredVariantRowBuilder::null(array.nulls())); + + let metadata = array.metadata_field(); + let mut value_builder = VariantValueArrayBuilder::new(array.len()); + let mut null_builder = NullBufferBuilder::new(array.len()); + for i in 0..array.len() { + if array.is_null(i) { + value_builder.append_null(); + null_builder.append_null(); + } else { + let metadata = VariantMetadata::new(metadata.value(i)); + let mut value_builder = value_builder.builder_ext(&metadata); + row_builder.append_row(&mut value_builder, &metadata, i)?; + null_builder.append_non_null(); + } + } + + let value = value_builder.build()?; + Ok(VariantArray::from_parts( + metadata.clone(), + Some(value), + None, + null_builder.finish(), + )) +} + +/// Row builder for converting shredded VariantArray rows back to unshredded form +enum UnshredVariantRowBuilder<'a> { + PrimitiveInt8(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Int8Type>>), + PrimitiveInt16(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Int16Type>>), + PrimitiveInt32(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Int32Type>>), + PrimitiveInt64(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Int64Type>>), + PrimitiveFloat32(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Float32Type>>), + PrimitiveFloat64(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Float64Type>>), + PrimitiveDate32(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Date32Type>>), + PrimitiveTime64(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Time64MicrosecondType>>), + TimestampMicrosecond(TimestampUnshredRowBuilder<'a, TimestampMicrosecondType>), + TimestampNanosecond(TimestampUnshredRowBuilder<'a, TimestampNanosecondType>), + PrimitiveBoolean(UnshredPrimitiveRowBuilder<'a, BooleanArray>), + PrimitiveString(UnshredPrimitiveRowBuilder<'a, StringArray>), + PrimitiveBinaryView(UnshredPrimitiveRowBuilder<'a, BinaryViewArray>), + PrimitiveUuid(UnshredPrimitiveRowBuilder<'a, FixedSizeBinaryArray>), + Struct(StructUnshredVariantBuilder<'a>), + ValueOnly(ValueOnlyUnshredVariantBuilder<'a>), + Null(NullUnshredVariantBuilder<'a>), +} + +impl<'a> UnshredVariantRowBuilder<'a> { + /// Creates an all-null row builder. + fn null(nulls: Option<&'a NullBuffer>) -> Self { + Self::Null(NullUnshredVariantBuilder::new(nulls)) + } + + /// Appends a single row at the given value index to the supplied builder. + fn append_row( + &mut self, + builder: &mut impl VariantBuilderExt, + metadata: &VariantMetadata, + index: usize, + ) -> Result<()> { + match self { + Self::PrimitiveInt8(b) => b.append_row(builder, metadata, index), + Self::PrimitiveInt16(b) => b.append_row(builder, metadata, index), + Self::PrimitiveInt32(b) => b.append_row(builder, metadata, index), + Self::PrimitiveInt64(b) => b.append_row(builder, metadata, index), + Self::PrimitiveFloat32(b) => b.append_row(builder, metadata, index), + Self::PrimitiveFloat64(b) => b.append_row(builder, metadata, index), + Self::PrimitiveDate32(b) => b.append_row(builder, metadata, index), + Self::PrimitiveTime64(b) => b.append_row(builder, metadata, index), + Self::TimestampMicrosecond(b) => b.append_row(builder, metadata, index), + Self::TimestampNanosecond(b) => b.append_row(builder, metadata, index), + Self::PrimitiveBoolean(b) => b.append_row(builder, metadata, index), + Self::PrimitiveString(b) => b.append_row(builder, metadata, index), + Self::PrimitiveBinaryView(b) => b.append_row(builder, metadata, index), + Self::PrimitiveUuid(b) => b.append_row(builder, metadata, index), + Self::Struct(b) => b.append_row(builder, metadata, index), + Self::ValueOnly(b) => b.append_row(builder, metadata, index), + Self::Null(b) => b.append_row(builder, metadata, index), + } + } +} + +/// Factory function to create the appropriate row builder for given field components +/// Returns None for None/None case - caller decides how to handle based on context +fn make_unshred_variant_row_builder<'a>( Review Comment: Minor, this might make more sense as a constructor of `UnshredVariantRowBuilder` like ```rust impl UnshredVariantRowBuilder<'a> { fn try_new(shredding_state: BorrowedShreddingState<'a>) -> Self { ... } } ``` ########## parquet/tests/variant_integration.rs: ########## @@ -274,13 +291,11 @@ impl VariantTestCase { let variant_data = self.load_variants(); let variant_array = self.load_parquet(); + let variant_array = unshred_variant(&variant_array).unwrap(); Review Comment: Some context might help: ```suggestion // The expected values are provided as unshredded variants, so // unshred the array here prior to comparison let variant_array = unshred_variant(&variant_array).unwrap(); ``` ########## parquet-variant-compute/src/unshred_variant.rs: ########## @@ -0,0 +1,520 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Module for unshredding VariantArray by folding typed_value columns back into the value column. + +use crate::{BorrowedShreddingState, VariantArray, VariantValueArrayBuilder}; +use arrow::array::{ + Array, AsArray as _, BinaryViewArray, BooleanArray, FixedSizeBinaryArray, NullBufferBuilder, + PrimitiveArray, StringArray, StructArray, +}; +use arrow::buffer::NullBuffer; +use arrow::datatypes::{ + ArrowPrimitiveType, DataType, Date32Type, Float32Type, Float64Type, Int16Type, Int32Type, + Int64Type, Int8Type, Time64MicrosecondType, TimeUnit, TimestampMicrosecondType, + TimestampNanosecondType, +}; +use arrow::error::{ArrowError, Result}; +use arrow::temporal_conversions::time64us_to_time; +use chrono::{DateTime, Utc}; +use indexmap::IndexMap; +use parquet_variant::{ObjectFieldBuilder, Variant, VariantBuilderExt, VariantMetadata}; +use uuid::Uuid; + +/// Removes all (nested) typed_value columns from a VariantArray by converting them back to binary +/// variant and merging the resulting values back into the value column. +/// +/// This function efficiently converts a shredded VariantArray back to an unshredded form where all +/// data resides in the value column. +/// +/// # Arguments +/// * `array` - The VariantArray to unshred +/// +/// # Returns +/// A new VariantArray with all data in the value column and no typed_value column +/// +/// # Errors +/// - If the shredded data contains spec violations (e.g., field name conflicts) +/// - If unsupported data types are encountered in typed_value columns +pub fn unshred_variant(array: &VariantArray) -> Result<VariantArray> { + // Check if already unshredded (optimization for common case) + if array.typed_value_field().is_none() && array.value_field().is_some() { + return Ok(array.clone()); + } + + // NOTE: None/None at top-level is technically invalid, but the shredding spec requires us to + // emit `Variant::Null` when a required value is missing. + let mut row_builder = make_unshred_variant_row_builder(array.shredding_state().borrow())? + .unwrap_or_else(|| UnshredVariantRowBuilder::null(array.nulls())); + + let metadata = array.metadata_field(); + let mut value_builder = VariantValueArrayBuilder::new(array.len()); + let mut null_builder = NullBufferBuilder::new(array.len()); + for i in 0..array.len() { + if array.is_null(i) { + value_builder.append_null(); + null_builder.append_null(); + } else { + let metadata = VariantMetadata::new(metadata.value(i)); + let mut value_builder = value_builder.builder_ext(&metadata); + row_builder.append_row(&mut value_builder, &metadata, i)?; + null_builder.append_non_null(); + } + } + + let value = value_builder.build()?; + Ok(VariantArray::from_parts( + metadata.clone(), + Some(value), + None, + null_builder.finish(), + )) +} + +/// Row builder for converting shredded VariantArray rows back to unshredded form +enum UnshredVariantRowBuilder<'a> { + PrimitiveInt8(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Int8Type>>), + PrimitiveInt16(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Int16Type>>), + PrimitiveInt32(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Int32Type>>), + PrimitiveInt64(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Int64Type>>), + PrimitiveFloat32(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Float32Type>>), + PrimitiveFloat64(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Float64Type>>), + PrimitiveDate32(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Date32Type>>), + PrimitiveTime64(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Time64MicrosecondType>>), + TimestampMicrosecond(TimestampUnshredRowBuilder<'a, TimestampMicrosecondType>), + TimestampNanosecond(TimestampUnshredRowBuilder<'a, TimestampNanosecondType>), + PrimitiveBoolean(UnshredPrimitiveRowBuilder<'a, BooleanArray>), + PrimitiveString(UnshredPrimitiveRowBuilder<'a, StringArray>), + PrimitiveBinaryView(UnshredPrimitiveRowBuilder<'a, BinaryViewArray>), + PrimitiveUuid(UnshredPrimitiveRowBuilder<'a, FixedSizeBinaryArray>), + Struct(StructUnshredVariantBuilder<'a>), + ValueOnly(ValueOnlyUnshredVariantBuilder<'a>), + Null(NullUnshredVariantBuilder<'a>), +} + +impl<'a> UnshredVariantRowBuilder<'a> { + /// Creates an all-null row builder. + fn null(nulls: Option<&'a NullBuffer>) -> Self { + Self::Null(NullUnshredVariantBuilder::new(nulls)) + } + + /// Appends a single row at the given value index to the supplied builder. + fn append_row( + &mut self, + builder: &mut impl VariantBuilderExt, + metadata: &VariantMetadata, + index: usize, + ) -> Result<()> { Review Comment: I don't know how much it matters, but you could avoid the dynamic dispatch here with a generic function: ```suggestion fn append_row<T: VariantBuilderExt>( &mut self, builder: &mut T, metadata: &VariantMetadata, index: usize, ) -> Result<()> { ``` ########## parquet-variant-compute/src/unshred_variant.rs: ########## @@ -0,0 +1,520 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Module for unshredding VariantArray by folding typed_value columns back into the value column. + +use crate::{BorrowedShreddingState, VariantArray, VariantValueArrayBuilder}; +use arrow::array::{ + Array, AsArray as _, BinaryViewArray, BooleanArray, FixedSizeBinaryArray, NullBufferBuilder, + PrimitiveArray, StringArray, StructArray, +}; +use arrow::buffer::NullBuffer; +use arrow::datatypes::{ + ArrowPrimitiveType, DataType, Date32Type, Float32Type, Float64Type, Int16Type, Int32Type, + Int64Type, Int8Type, Time64MicrosecondType, TimeUnit, TimestampMicrosecondType, + TimestampNanosecondType, +}; +use arrow::error::{ArrowError, Result}; +use arrow::temporal_conversions::time64us_to_time; +use chrono::{DateTime, Utc}; +use indexmap::IndexMap; +use parquet_variant::{ObjectFieldBuilder, Variant, VariantBuilderExt, VariantMetadata}; +use uuid::Uuid; + +/// Removes all (nested) typed_value columns from a VariantArray by converting them back to binary +/// variant and merging the resulting values back into the value column. +/// +/// This function efficiently converts a shredded VariantArray back to an unshredded form where all +/// data resides in the value column. +/// +/// # Arguments +/// * `array` - The VariantArray to unshred +/// +/// # Returns +/// A new VariantArray with all data in the value column and no typed_value column +/// +/// # Errors +/// - If the shredded data contains spec violations (e.g., field name conflicts) +/// - If unsupported data types are encountered in typed_value columns +pub fn unshred_variant(array: &VariantArray) -> Result<VariantArray> { + // Check if already unshredded (optimization for common case) + if array.typed_value_field().is_none() && array.value_field().is_some() { + return Ok(array.clone()); + } + + // NOTE: None/None at top-level is technically invalid, but the shredding spec requires us to + // emit `Variant::Null` when a required value is missing. + let mut row_builder = make_unshred_variant_row_builder(array.shredding_state().borrow())? + .unwrap_or_else(|| UnshredVariantRowBuilder::null(array.nulls())); + + let metadata = array.metadata_field(); + let mut value_builder = VariantValueArrayBuilder::new(array.len()); + let mut null_builder = NullBufferBuilder::new(array.len()); + for i in 0..array.len() { + if array.is_null(i) { + value_builder.append_null(); + null_builder.append_null(); + } else { + let metadata = VariantMetadata::new(metadata.value(i)); + let mut value_builder = value_builder.builder_ext(&metadata); + row_builder.append_row(&mut value_builder, &metadata, i)?; + null_builder.append_non_null(); + } + } + + let value = value_builder.build()?; + Ok(VariantArray::from_parts( + metadata.clone(), + Some(value), + None, + null_builder.finish(), + )) +} + +/// Row builder for converting shredded VariantArray rows back to unshredded form +enum UnshredVariantRowBuilder<'a> { + PrimitiveInt8(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Int8Type>>), + PrimitiveInt16(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Int16Type>>), + PrimitiveInt32(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Int32Type>>), + PrimitiveInt64(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Int64Type>>), + PrimitiveFloat32(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Float32Type>>), + PrimitiveFloat64(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Float64Type>>), + PrimitiveDate32(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Date32Type>>), + PrimitiveTime64(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Time64MicrosecondType>>), + TimestampMicrosecond(TimestampUnshredRowBuilder<'a, TimestampMicrosecondType>), + TimestampNanosecond(TimestampUnshredRowBuilder<'a, TimestampNanosecondType>), + PrimitiveBoolean(UnshredPrimitiveRowBuilder<'a, BooleanArray>), + PrimitiveString(UnshredPrimitiveRowBuilder<'a, StringArray>), + PrimitiveBinaryView(UnshredPrimitiveRowBuilder<'a, BinaryViewArray>), + PrimitiveUuid(UnshredPrimitiveRowBuilder<'a, FixedSizeBinaryArray>), + Struct(StructUnshredVariantBuilder<'a>), + ValueOnly(ValueOnlyUnshredVariantBuilder<'a>), + Null(NullUnshredVariantBuilder<'a>), +} + +impl<'a> UnshredVariantRowBuilder<'a> { + /// Creates an all-null row builder. + fn null(nulls: Option<&'a NullBuffer>) -> Self { + Self::Null(NullUnshredVariantBuilder::new(nulls)) + } + + /// Appends a single row at the given value index to the supplied builder. + fn append_row( + &mut self, + builder: &mut impl VariantBuilderExt, + metadata: &VariantMetadata, + index: usize, + ) -> Result<()> { + match self { + Self::PrimitiveInt8(b) => b.append_row(builder, metadata, index), + Self::PrimitiveInt16(b) => b.append_row(builder, metadata, index), + Self::PrimitiveInt32(b) => b.append_row(builder, metadata, index), + Self::PrimitiveInt64(b) => b.append_row(builder, metadata, index), + Self::PrimitiveFloat32(b) => b.append_row(builder, metadata, index), + Self::PrimitiveFloat64(b) => b.append_row(builder, metadata, index), + Self::PrimitiveDate32(b) => b.append_row(builder, metadata, index), + Self::PrimitiveTime64(b) => b.append_row(builder, metadata, index), + Self::TimestampMicrosecond(b) => b.append_row(builder, metadata, index), + Self::TimestampNanosecond(b) => b.append_row(builder, metadata, index), + Self::PrimitiveBoolean(b) => b.append_row(builder, metadata, index), + Self::PrimitiveString(b) => b.append_row(builder, metadata, index), + Self::PrimitiveBinaryView(b) => b.append_row(builder, metadata, index), + Self::PrimitiveUuid(b) => b.append_row(builder, metadata, index), + Self::Struct(b) => b.append_row(builder, metadata, index), + Self::ValueOnly(b) => b.append_row(builder, metadata, index), + Self::Null(b) => b.append_row(builder, metadata, index), + } + } +} + +/// Factory function to create the appropriate row builder for given field components +/// Returns None for None/None case - caller decides how to handle based on context +fn make_unshred_variant_row_builder<'a>( + shredding_state: BorrowedShreddingState<'a>, +) -> Result<Option<UnshredVariantRowBuilder<'a>>> { + let value = shredding_state.value_field(); + let typed_value = shredding_state.typed_value_field(); + let Some(typed_value) = typed_value else { + // Copy the value across directly, if present. Else caller decides what to do. + return Ok(value + .map(|v| UnshredVariantRowBuilder::ValueOnly(ValueOnlyUnshredVariantBuilder::new(v)))); + }; + + // Has typed_value -> determine type and create appropriate builder + macro_rules! primitive_builder { + ($enum_variant:ident, $cast_fn:ident) => { + UnshredVariantRowBuilder::$enum_variant(UnshredPrimitiveRowBuilder::new( + value, + typed_value.$cast_fn(), + )) + }; + } + + let builder = match typed_value.data_type() { + DataType::Int8 => primitive_builder!(PrimitiveInt8, as_primitive), + DataType::Int16 => primitive_builder!(PrimitiveInt16, as_primitive), + DataType::Int32 => primitive_builder!(PrimitiveInt32, as_primitive), + DataType::Int64 => primitive_builder!(PrimitiveInt64, as_primitive), + DataType::Float32 => primitive_builder!(PrimitiveFloat32, as_primitive), + DataType::Float64 => primitive_builder!(PrimitiveFloat64, as_primitive), + DataType::Date32 => primitive_builder!(PrimitiveDate32, as_primitive), + DataType::Time64(TimeUnit::Microsecond) => { + primitive_builder!(PrimitiveTime64, as_primitive) + } + DataType::Time64(time_unit) => { + return Err(ArrowError::InvalidArgumentError(format!( + "Time64({time_unit}) is not a valid variant shredding type", + ))); + } + DataType::Timestamp(TimeUnit::Microsecond, timezone) => { + UnshredVariantRowBuilder::TimestampMicrosecond(TimestampUnshredRowBuilder::new( + value, + typed_value.as_primitive(), + timezone.is_some(), + )) + } + DataType::Timestamp(TimeUnit::Nanosecond, timezone) => { + UnshredVariantRowBuilder::TimestampNanosecond(TimestampUnshredRowBuilder::new( + value, + typed_value.as_primitive(), + timezone.is_some(), + )) + } + DataType::Timestamp(time_unit, _) => { + return Err(ArrowError::InvalidArgumentError(format!( + "Timestamp({time_unit}) is not a valid variant shredding type", + ))); + } + DataType::Boolean => primitive_builder!(PrimitiveBoolean, as_boolean), + DataType::Utf8 => primitive_builder!(PrimitiveString, as_string), + DataType::BinaryView => primitive_builder!(PrimitiveBinaryView, as_binary_view), + DataType::FixedSizeBinary(16) => primitive_builder!(PrimitiveUuid, as_fixed_size_binary), + DataType::FixedSizeBinary(size) => { + return Err(ArrowError::InvalidArgumentError(format!( + "FixedSizeBinary({size}) is not a valid variant shredding type", + ))); + } + DataType::Struct(_) => UnshredVariantRowBuilder::Struct( + StructUnshredVariantBuilder::try_new(value, typed_value.as_struct())?, + ), + _ => { + return Err(ArrowError::NotYetImplemented(format!( + "Unshredding not yet supported for type: {}", + typed_value.data_type() + ))); + } + }; + Ok(Some(builder)) +} + +/// Builder for arrays with neither typed_value nor value (all NULL/Variant::Null) +struct NullUnshredVariantBuilder<'a> { + nulls: Option<&'a NullBuffer>, +} + +impl<'a> NullUnshredVariantBuilder<'a> { + fn new(nulls: Option<&'a NullBuffer>) -> Self { + Self { nulls } + } + + fn append_row( + &mut self, + builder: &mut impl VariantBuilderExt, + _metadata: &VariantMetadata, + index: usize, + ) -> Result<()> { + if self.nulls.is_some_and(|nulls| nulls.is_null(index)) { + builder.append_null(); + } else { + builder.append_value(Variant::Null); + } + Ok(()) + } +} + +/// Builder for arrays that only have value column (already unshredded) +struct ValueOnlyUnshredVariantBuilder<'a> { + value: &'a arrow::array::BinaryViewArray, +} + +impl<'a> ValueOnlyUnshredVariantBuilder<'a> { + fn new(value: &'a BinaryViewArray) -> Self { + Self { value } + } + + fn append_row( + &mut self, + builder: &mut impl VariantBuilderExt, + metadata: &VariantMetadata, + index: usize, + ) -> Result<()> { + if self.value.is_null(index) { + builder.append_null(); + } else { + let variant = Variant::new_with_metadata(metadata.clone(), self.value.value(index)); + builder.append_value(variant); + } + Ok(()) + } +} + +/// Extension trait that directly adds row builder support for arrays that correspond to primitive +/// variant types. +trait PrimitiveVariantUnshredExt { + fn append_to_variant_builder( + &self, + builder: &mut impl VariantBuilderExt, + index: usize, + ) -> Result<()>; +} + +/// Macro that handles the common unshredded case and returns early if handled. +/// If not handled (shredded case), validates and returns the extracted value. +macro_rules! handle_unshredded_case { + ($self:expr, $builder:expr, $metadata:expr, $index:expr, $partial_shredding:expr) => {{ + let value = $self.value.as_ref().filter(|v| v.is_valid($index)); + let value = value.map(|v| Variant::new_with_metadata($metadata.clone(), v.value($index))); + + // If typed_value is null, handle unshredded case and return early + if $self.typed_value.is_null($index) { + match value { + Some(value) => $builder.append_value(value), + None => $builder.append_null(), + } + return Ok(()); + } + + // Only partial shredding allows value and typed_value to both be non-NULL + if !$partial_shredding && value.is_some() { + return Err(ArrowError::InvalidArgumentError( + "Invalid shredded variant: both value and typed_value are non-null".to_string(), + )); + } + + // Return the extracted value for the partial shredded case + value + }}; +} + +/// Generic unshred builder that works with any typed array implementing VariantUnshredExt +struct UnshredPrimitiveRowBuilder<'a, T> { + value: Option<&'a BinaryViewArray>, + typed_value: &'a T, +} + +impl<'a, T: Array + PrimitiveVariantUnshredExt> UnshredPrimitiveRowBuilder<'a, T> { + fn new(value: Option<&'a BinaryViewArray>, typed_value: &'a T) -> Self { + Self { value, typed_value } + } + + fn append_row( + &mut self, + builder: &mut impl VariantBuilderExt, + metadata: &VariantMetadata, + index: usize, + ) -> Result<()> { + handle_unshredded_case!(self, builder, metadata, index, false); + + // If we get here, typed_value is valid and value is NULL + self.typed_value.append_to_variant_builder(builder, index) + } +} + +// Macro to generate VariantUnshredExt implementations with optional value transformation +macro_rules! impl_variant_unshred { + ($array_type:ty $(, |$v:ident| $transform:expr)? ) => { + impl PrimitiveVariantUnshredExt for $array_type { + fn append_to_variant_builder( + &self, + builder: &mut impl VariantBuilderExt, + index: usize, + ) -> Result<()> { + let value = self.value(index); + $( + let $v = value; + let value = $transform; + )? + builder.append_value(value); + Ok(()) + } + } + }; +} + +impl_variant_unshred!(BooleanArray); +impl_variant_unshred!(StringArray); +impl_variant_unshred!(BinaryViewArray); +impl_variant_unshred!(PrimitiveArray<Int8Type>); +impl_variant_unshred!(PrimitiveArray<Int16Type>); +impl_variant_unshred!(PrimitiveArray<Int32Type>); +impl_variant_unshred!(PrimitiveArray<Int64Type>); +impl_variant_unshred!(PrimitiveArray<Float32Type>); +impl_variant_unshred!(PrimitiveArray<Float64Type>); + +impl_variant_unshred!(PrimitiveArray<Date32Type>, |days_since_epoch| { + Date32Type::to_naive_date(days_since_epoch) +}); + +impl_variant_unshred!( + PrimitiveArray<Time64MicrosecondType>, + |micros_since_midnight| { + time64us_to_time(micros_since_midnight).ok_or_else(|| { + ArrowError::InvalidArgumentError(format!( + "Invalid Time64 microsecond value: {micros_since_midnight}" + )) + })? + } +); + +// UUID from FixedSizeBinary(16) +// NOTE: FixedSizeBinaryArray guarantees the byte length, so we can safely unwrap +impl_variant_unshred!(FixedSizeBinaryArray, |bytes| { + Uuid::from_slice(bytes).unwrap() +}); + +/// Trait for timestamp types to handle conversion to `DateTime<Utc>` +trait TimestampType: ArrowPrimitiveType<Native = i64> { + fn to_datetime_utc(value: i64) -> Result<DateTime<Utc>>; +} + +impl TimestampType for TimestampMicrosecondType { + fn to_datetime_utc(micros: i64) -> Result<DateTime<Utc>> { + DateTime::from_timestamp_micros(micros).ok_or_else(|| { + ArrowError::InvalidArgumentError(format!( + "Invalid timestamp microsecond value: {micros}" + )) + }) + } +} + +impl TimestampType for TimestampNanosecondType { + fn to_datetime_utc(nanos: i64) -> Result<DateTime<Utc>> { + Ok(DateTime::from_timestamp_nanos(nanos)) + } +} + +/// Generic builder for timestamp types that handles timezone-aware conversion +struct TimestampUnshredRowBuilder<'a, T: TimestampType> { + value: Option<&'a BinaryViewArray>, + typed_value: &'a PrimitiveArray<T>, + has_timezone: bool, +} + +impl<'a, T: TimestampType> TimestampUnshredRowBuilder<'a, T> { + fn new( + value: Option<&'a BinaryViewArray>, + typed_value: &'a PrimitiveArray<T>, + has_timezone: bool, + ) -> Self { + Self { + value, + typed_value, + has_timezone, + } + } + + fn append_row( + &mut self, + builder: &mut impl VariantBuilderExt, + metadata: &VariantMetadata, + index: usize, + ) -> Result<()> { + handle_unshredded_case!(self, builder, metadata, index, false); + + // If we get here, typed_value is valid and value is NULL + let timestamp_value = self.typed_value.value(index); + let dt = T::to_datetime_utc(timestamp_value)?; + if self.has_timezone { + builder.append_value(dt); + } else { + builder.append_value(dt.naive_utc()); + } + Ok(()) + } +} + +/// Builder for unshredding struct/object types with nested fields +struct StructUnshredVariantBuilder<'a> { + value: Option<&'a arrow::array::BinaryViewArray>, + typed_value: &'a arrow::array::StructArray, + field_unshredders: IndexMap<&'a str, Option<UnshredVariantRowBuilder<'a>>>, +} + +impl<'a> StructUnshredVariantBuilder<'a> { + fn try_new(value: Option<&'a BinaryViewArray>, typed_value: &'a StructArray) -> Result<Self> { + // Create unshredders for each field in constructor + let mut field_unshredders = IndexMap::new(); + for (field, field_array) in typed_value.fields().iter().zip(typed_value.columns()) { + // Factory returns None for None/None case -- these are missing fields we should skip + let Some(field_array) = field_array.as_struct_opt() else { + return Err(ArrowError::InvalidArgumentError(format!( + "Invalid shredded variant object field: expected Struct, got {}", + field_array.data_type() + ))); + }; + let field_unshredder = make_unshred_variant_row_builder(field_array.try_into()?)?; + field_unshredders.insert(field.name().as_ref(), field_unshredder); + } + + Ok(Self { + value, + typed_value, + field_unshredders, + }) + } + + fn append_row( + &mut self, + builder: &mut impl VariantBuilderExt, + metadata: &VariantMetadata, + index: usize, + ) -> Result<()> { + let value = handle_unshredded_case!(self, builder, metadata, index, true); + + // If we get here, typed_value is valid and value may or may not be valid + let mut object_builder = builder.try_new_object()?; + + // Process typed fields (skip empty builders that indicate missing fields) + for (field_name, field_unshredder_opt) in &mut self.field_unshredders { + if let Some(field_unshredder) = field_unshredder_opt { + let mut field_builder = ObjectFieldBuilder::new(field_name, &mut object_builder); + field_unshredder.append_row(&mut field_builder, metadata, index)?; + } + } + + // Process any unshredded fields (partial shredding) + if let Some(value) = value { + let Variant::Object(object) = value else { + return Err(ArrowError::InvalidArgumentError( + "Expected object in value field for partially shredded struct".to_string(), + )); + }; + + for (field_name, field_value) in object.iter() { + if self.field_unshredders.contains_key(field_name) { + return Err(ArrowError::InvalidArgumentError(format!( + "Field '{field_name}' appears in both typed_value and value", + ))); + } + object_builder.insert_bytes(field_name, field_value); + } + } + + object_builder.finish(); + Ok(()) + } +} Review Comment: I was wondering if we should have any tests for this kernel directly I do realize the tests in parquet/tests/variant_integration.rs cover it pretty thoroughly Maybe we can leave a comment to that effect ```suggestion } // Note: code is covered by tests in `parquet/tests/variant_integration.rs` ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
