alamb commented on code in PR #8481:
URL: https://github.com/apache/arrow-rs/pull/8481#discussion_r2392628784


##########
parquet-variant-compute/src/unshred_variant.rs:
##########
@@ -0,0 +1,520 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Module for unshredding VariantArray by folding typed_value columns back 
into the value column.
+
+use crate::{BorrowedShreddingState, VariantArray, VariantValueArrayBuilder};
+use arrow::array::{
+    Array, AsArray as _, BinaryViewArray, BooleanArray, FixedSizeBinaryArray, 
NullBufferBuilder,
+    PrimitiveArray, StringArray, StructArray,
+};
+use arrow::buffer::NullBuffer;
+use arrow::datatypes::{
+    ArrowPrimitiveType, DataType, Date32Type, Float32Type, Float64Type, 
Int16Type, Int32Type,
+    Int64Type, Int8Type, Time64MicrosecondType, TimeUnit, 
TimestampMicrosecondType,
+    TimestampNanosecondType,
+};
+use arrow::error::{ArrowError, Result};
+use arrow::temporal_conversions::time64us_to_time;
+use chrono::{DateTime, Utc};
+use indexmap::IndexMap;
+use parquet_variant::{ObjectFieldBuilder, Variant, VariantBuilderExt, 
VariantMetadata};
+use uuid::Uuid;
+
+/// Removes all (nested) typed_value columns from a VariantArray by converting 
them back to binary
+/// variant and merging the resulting values back into the value column.
+///
+/// This function efficiently converts a shredded VariantArray back to an 
unshredded form where all
+/// data resides in the value column.
+///
+/// # Arguments
+/// * `array` - The VariantArray to unshred
+///
+/// # Returns
+/// A new VariantArray with all data in the value column and no typed_value 
column
+///
+/// # Errors
+/// - If the shredded data contains spec violations (e.g., field name 
conflicts)
+/// - If unsupported data types are encountered in typed_value columns
+pub fn unshred_variant(array: &VariantArray) -> Result<VariantArray> {
+    // Check if already unshredded (optimization for common case)
+    if array.typed_value_field().is_none() && array.value_field().is_some() {
+        return Ok(array.clone());
+    }
+
+    // NOTE: None/None at top-level is technically invalid, but the shredding 
spec requires us to
+    // emit `Variant::Null` when a required value is missing.
+    let mut row_builder = 
make_unshred_variant_row_builder(array.shredding_state().borrow())?
+        .unwrap_or_else(|| UnshredVariantRowBuilder::null(array.nulls()));
+
+    let metadata = array.metadata_field();
+    let mut value_builder = VariantValueArrayBuilder::new(array.len());
+    let mut null_builder = NullBufferBuilder::new(array.len());
+    for i in 0..array.len() {
+        if array.is_null(i) {
+            value_builder.append_null();
+            null_builder.append_null();

Review Comment:
   as a follow on PR/ optimization, I think we can just 
clone`array.null_buffer()` rather than rebuilding up the same buffer again row 
by row via `null_builder`



##########
parquet-variant-compute/src/unshred_variant.rs:
##########
@@ -0,0 +1,520 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Module for unshredding VariantArray by folding typed_value columns back 
into the value column.
+
+use crate::{BorrowedShreddingState, VariantArray, VariantValueArrayBuilder};
+use arrow::array::{
+    Array, AsArray as _, BinaryViewArray, BooleanArray, FixedSizeBinaryArray, 
NullBufferBuilder,
+    PrimitiveArray, StringArray, StructArray,
+};
+use arrow::buffer::NullBuffer;
+use arrow::datatypes::{
+    ArrowPrimitiveType, DataType, Date32Type, Float32Type, Float64Type, 
Int16Type, Int32Type,
+    Int64Type, Int8Type, Time64MicrosecondType, TimeUnit, 
TimestampMicrosecondType,
+    TimestampNanosecondType,
+};
+use arrow::error::{ArrowError, Result};
+use arrow::temporal_conversions::time64us_to_time;
+use chrono::{DateTime, Utc};
+use indexmap::IndexMap;
+use parquet_variant::{ObjectFieldBuilder, Variant, VariantBuilderExt, 
VariantMetadata};
+use uuid::Uuid;
+
+/// Removes all (nested) typed_value columns from a VariantArray by converting 
them back to binary
+/// variant and merging the resulting values back into the value column.
+///
+/// This function efficiently converts a shredded VariantArray back to an 
unshredded form where all
+/// data resides in the value column.
+///
+/// # Arguments
+/// * `array` - The VariantArray to unshred
+///
+/// # Returns
+/// A new VariantArray with all data in the value column and no typed_value 
column
+///
+/// # Errors
+/// - If the shredded data contains spec violations (e.g., field name 
conflicts)
+/// - If unsupported data types are encountered in typed_value columns
+pub fn unshred_variant(array: &VariantArray) -> Result<VariantArray> {
+    // Check if already unshredded (optimization for common case)
+    if array.typed_value_field().is_none() && array.value_field().is_some() {
+        return Ok(array.clone());
+    }
+
+    // NOTE: None/None at top-level is technically invalid, but the shredding 
spec requires us to
+    // emit `Variant::Null` when a required value is missing.
+    let mut row_builder = 
make_unshred_variant_row_builder(array.shredding_state().borrow())?
+        .unwrap_or_else(|| UnshredVariantRowBuilder::null(array.nulls()));
+
+    let metadata = array.metadata_field();
+    let mut value_builder = VariantValueArrayBuilder::new(array.len());
+    let mut null_builder = NullBufferBuilder::new(array.len());
+    for i in 0..array.len() {
+        if array.is_null(i) {
+            value_builder.append_null();
+            null_builder.append_null();
+        } else {
+            let metadata = VariantMetadata::new(metadata.value(i));
+            let mut value_builder = value_builder.builder_ext(&metadata);
+            row_builder.append_row(&mut value_builder, &metadata, i)?;
+            null_builder.append_non_null();
+        }
+    }
+
+    let value = value_builder.build()?;
+    Ok(VariantArray::from_parts(
+        metadata.clone(),
+        Some(value),
+        None,
+        null_builder.finish(),
+    ))
+}
+
+/// Row builder for converting shredded VariantArray rows back to unshredded 
form
+enum UnshredVariantRowBuilder<'a> {
+    PrimitiveInt8(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Int8Type>>),
+    PrimitiveInt16(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Int16Type>>),
+    PrimitiveInt32(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Int32Type>>),
+    PrimitiveInt64(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Int64Type>>),
+    PrimitiveFloat32(UnshredPrimitiveRowBuilder<'a, 
PrimitiveArray<Float32Type>>),
+    PrimitiveFloat64(UnshredPrimitiveRowBuilder<'a, 
PrimitiveArray<Float64Type>>),
+    PrimitiveDate32(UnshredPrimitiveRowBuilder<'a, 
PrimitiveArray<Date32Type>>),
+    PrimitiveTime64(UnshredPrimitiveRowBuilder<'a, 
PrimitiveArray<Time64MicrosecondType>>),
+    TimestampMicrosecond(TimestampUnshredRowBuilder<'a, 
TimestampMicrosecondType>),
+    TimestampNanosecond(TimestampUnshredRowBuilder<'a, 
TimestampNanosecondType>),
+    PrimitiveBoolean(UnshredPrimitiveRowBuilder<'a, BooleanArray>),
+    PrimitiveString(UnshredPrimitiveRowBuilder<'a, StringArray>),
+    PrimitiveBinaryView(UnshredPrimitiveRowBuilder<'a, BinaryViewArray>),
+    PrimitiveUuid(UnshredPrimitiveRowBuilder<'a, FixedSizeBinaryArray>),
+    Struct(StructUnshredVariantBuilder<'a>),
+    ValueOnly(ValueOnlyUnshredVariantBuilder<'a>),
+    Null(NullUnshredVariantBuilder<'a>),
+}
+
+impl<'a> UnshredVariantRowBuilder<'a> {
+    /// Creates an all-null row builder.
+    fn null(nulls: Option<&'a NullBuffer>) -> Self {
+        Self::Null(NullUnshredVariantBuilder::new(nulls))
+    }
+
+    /// Appends a single row at the given value index to the supplied builder.
+    fn append_row(
+        &mut self,
+        builder: &mut impl VariantBuilderExt,
+        metadata: &VariantMetadata,
+        index: usize,
+    ) -> Result<()> {
+        match self {
+            Self::PrimitiveInt8(b) => b.append_row(builder, metadata, index),
+            Self::PrimitiveInt16(b) => b.append_row(builder, metadata, index),
+            Self::PrimitiveInt32(b) => b.append_row(builder, metadata, index),
+            Self::PrimitiveInt64(b) => b.append_row(builder, metadata, index),
+            Self::PrimitiveFloat32(b) => b.append_row(builder, metadata, 
index),
+            Self::PrimitiveFloat64(b) => b.append_row(builder, metadata, 
index),
+            Self::PrimitiveDate32(b) => b.append_row(builder, metadata, index),
+            Self::PrimitiveTime64(b) => b.append_row(builder, metadata, index),
+            Self::TimestampMicrosecond(b) => b.append_row(builder, metadata, 
index),
+            Self::TimestampNanosecond(b) => b.append_row(builder, metadata, 
index),
+            Self::PrimitiveBoolean(b) => b.append_row(builder, metadata, 
index),
+            Self::PrimitiveString(b) => b.append_row(builder, metadata, index),
+            Self::PrimitiveBinaryView(b) => b.append_row(builder, metadata, 
index),
+            Self::PrimitiveUuid(b) => b.append_row(builder, metadata, index),
+            Self::Struct(b) => b.append_row(builder, metadata, index),
+            Self::ValueOnly(b) => b.append_row(builder, metadata, index),
+            Self::Null(b) => b.append_row(builder, metadata, index),
+        }
+    }
+}
+
+/// Factory function to create the appropriate row builder for given field 
components
+/// Returns None for None/None case - caller decides how to handle based on 
context
+fn make_unshred_variant_row_builder<'a>(

Review Comment:
   Minor, this might make more sense as a constructor of 
`UnshredVariantRowBuilder` like
   
   ```rust
   impl UnshredVariantRowBuilder<'a> {
       fn try_new(shredding_state: BorrowedShreddingState<'a>) -> Self {
   ...
      }
   }
   ```



##########
parquet/tests/variant_integration.rs:
##########
@@ -274,13 +291,11 @@ impl VariantTestCase {
 
         let variant_data = self.load_variants();
         let variant_array = self.load_parquet();
+        let variant_array = unshred_variant(&variant_array).unwrap();

Review Comment:
   Some context might help: 
   ```suggestion
           // The expected values are provided as unshredded variants, so
           // unshred the array here prior to comparison
           let variant_array = unshred_variant(&variant_array).unwrap();
   ```



##########
parquet-variant-compute/src/unshred_variant.rs:
##########
@@ -0,0 +1,520 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Module for unshredding VariantArray by folding typed_value columns back 
into the value column.
+
+use crate::{BorrowedShreddingState, VariantArray, VariantValueArrayBuilder};
+use arrow::array::{
+    Array, AsArray as _, BinaryViewArray, BooleanArray, FixedSizeBinaryArray, 
NullBufferBuilder,
+    PrimitiveArray, StringArray, StructArray,
+};
+use arrow::buffer::NullBuffer;
+use arrow::datatypes::{
+    ArrowPrimitiveType, DataType, Date32Type, Float32Type, Float64Type, 
Int16Type, Int32Type,
+    Int64Type, Int8Type, Time64MicrosecondType, TimeUnit, 
TimestampMicrosecondType,
+    TimestampNanosecondType,
+};
+use arrow::error::{ArrowError, Result};
+use arrow::temporal_conversions::time64us_to_time;
+use chrono::{DateTime, Utc};
+use indexmap::IndexMap;
+use parquet_variant::{ObjectFieldBuilder, Variant, VariantBuilderExt, 
VariantMetadata};
+use uuid::Uuid;
+
+/// Removes all (nested) typed_value columns from a VariantArray by converting 
them back to binary
+/// variant and merging the resulting values back into the value column.
+///
+/// This function efficiently converts a shredded VariantArray back to an 
unshredded form where all
+/// data resides in the value column.
+///
+/// # Arguments
+/// * `array` - The VariantArray to unshred
+///
+/// # Returns
+/// A new VariantArray with all data in the value column and no typed_value 
column
+///
+/// # Errors
+/// - If the shredded data contains spec violations (e.g., field name 
conflicts)
+/// - If unsupported data types are encountered in typed_value columns
+pub fn unshred_variant(array: &VariantArray) -> Result<VariantArray> {
+    // Check if already unshredded (optimization for common case)
+    if array.typed_value_field().is_none() && array.value_field().is_some() {
+        return Ok(array.clone());
+    }
+
+    // NOTE: None/None at top-level is technically invalid, but the shredding 
spec requires us to
+    // emit `Variant::Null` when a required value is missing.
+    let mut row_builder = 
make_unshred_variant_row_builder(array.shredding_state().borrow())?
+        .unwrap_or_else(|| UnshredVariantRowBuilder::null(array.nulls()));
+
+    let metadata = array.metadata_field();
+    let mut value_builder = VariantValueArrayBuilder::new(array.len());
+    let mut null_builder = NullBufferBuilder::new(array.len());
+    for i in 0..array.len() {
+        if array.is_null(i) {
+            value_builder.append_null();
+            null_builder.append_null();
+        } else {
+            let metadata = VariantMetadata::new(metadata.value(i));
+            let mut value_builder = value_builder.builder_ext(&metadata);
+            row_builder.append_row(&mut value_builder, &metadata, i)?;
+            null_builder.append_non_null();
+        }
+    }
+
+    let value = value_builder.build()?;
+    Ok(VariantArray::from_parts(
+        metadata.clone(),
+        Some(value),
+        None,
+        null_builder.finish(),
+    ))
+}
+
+/// Row builder for converting shredded VariantArray rows back to unshredded 
form
+enum UnshredVariantRowBuilder<'a> {
+    PrimitiveInt8(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Int8Type>>),
+    PrimitiveInt16(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Int16Type>>),
+    PrimitiveInt32(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Int32Type>>),
+    PrimitiveInt64(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Int64Type>>),
+    PrimitiveFloat32(UnshredPrimitiveRowBuilder<'a, 
PrimitiveArray<Float32Type>>),
+    PrimitiveFloat64(UnshredPrimitiveRowBuilder<'a, 
PrimitiveArray<Float64Type>>),
+    PrimitiveDate32(UnshredPrimitiveRowBuilder<'a, 
PrimitiveArray<Date32Type>>),
+    PrimitiveTime64(UnshredPrimitiveRowBuilder<'a, 
PrimitiveArray<Time64MicrosecondType>>),
+    TimestampMicrosecond(TimestampUnshredRowBuilder<'a, 
TimestampMicrosecondType>),
+    TimestampNanosecond(TimestampUnshredRowBuilder<'a, 
TimestampNanosecondType>),
+    PrimitiveBoolean(UnshredPrimitiveRowBuilder<'a, BooleanArray>),
+    PrimitiveString(UnshredPrimitiveRowBuilder<'a, StringArray>),
+    PrimitiveBinaryView(UnshredPrimitiveRowBuilder<'a, BinaryViewArray>),
+    PrimitiveUuid(UnshredPrimitiveRowBuilder<'a, FixedSizeBinaryArray>),
+    Struct(StructUnshredVariantBuilder<'a>),
+    ValueOnly(ValueOnlyUnshredVariantBuilder<'a>),
+    Null(NullUnshredVariantBuilder<'a>),
+}
+
+impl<'a> UnshredVariantRowBuilder<'a> {
+    /// Creates an all-null row builder.
+    fn null(nulls: Option<&'a NullBuffer>) -> Self {
+        Self::Null(NullUnshredVariantBuilder::new(nulls))
+    }
+
+    /// Appends a single row at the given value index to the supplied builder.
+    fn append_row(
+        &mut self,
+        builder: &mut impl VariantBuilderExt,
+        metadata: &VariantMetadata,
+        index: usize,
+    ) -> Result<()> {

Review Comment:
   I don't know how much it matters, but you could avoid the dynamic dispatch 
here with a generic function:
   
   ```suggestion
       fn append_row<T: VariantBuilderExt>(
           &mut self,
           builder: &mut T,
           metadata: &VariantMetadata,
           index: usize,
       ) -> Result<()> {
   ```



##########
parquet-variant-compute/src/unshred_variant.rs:
##########
@@ -0,0 +1,520 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Module for unshredding VariantArray by folding typed_value columns back 
into the value column.
+
+use crate::{BorrowedShreddingState, VariantArray, VariantValueArrayBuilder};
+use arrow::array::{
+    Array, AsArray as _, BinaryViewArray, BooleanArray, FixedSizeBinaryArray, 
NullBufferBuilder,
+    PrimitiveArray, StringArray, StructArray,
+};
+use arrow::buffer::NullBuffer;
+use arrow::datatypes::{
+    ArrowPrimitiveType, DataType, Date32Type, Float32Type, Float64Type, 
Int16Type, Int32Type,
+    Int64Type, Int8Type, Time64MicrosecondType, TimeUnit, 
TimestampMicrosecondType,
+    TimestampNanosecondType,
+};
+use arrow::error::{ArrowError, Result};
+use arrow::temporal_conversions::time64us_to_time;
+use chrono::{DateTime, Utc};
+use indexmap::IndexMap;
+use parquet_variant::{ObjectFieldBuilder, Variant, VariantBuilderExt, 
VariantMetadata};
+use uuid::Uuid;
+
+/// Removes all (nested) typed_value columns from a VariantArray by converting 
them back to binary
+/// variant and merging the resulting values back into the value column.
+///
+/// This function efficiently converts a shredded VariantArray back to an 
unshredded form where all
+/// data resides in the value column.
+///
+/// # Arguments
+/// * `array` - The VariantArray to unshred
+///
+/// # Returns
+/// A new VariantArray with all data in the value column and no typed_value 
column
+///
+/// # Errors
+/// - If the shredded data contains spec violations (e.g., field name 
conflicts)
+/// - If unsupported data types are encountered in typed_value columns
+pub fn unshred_variant(array: &VariantArray) -> Result<VariantArray> {
+    // Check if already unshredded (optimization for common case)
+    if array.typed_value_field().is_none() && array.value_field().is_some() {
+        return Ok(array.clone());
+    }
+
+    // NOTE: None/None at top-level is technically invalid, but the shredding 
spec requires us to
+    // emit `Variant::Null` when a required value is missing.
+    let mut row_builder = 
make_unshred_variant_row_builder(array.shredding_state().borrow())?
+        .unwrap_or_else(|| UnshredVariantRowBuilder::null(array.nulls()));
+
+    let metadata = array.metadata_field();
+    let mut value_builder = VariantValueArrayBuilder::new(array.len());
+    let mut null_builder = NullBufferBuilder::new(array.len());
+    for i in 0..array.len() {
+        if array.is_null(i) {
+            value_builder.append_null();
+            null_builder.append_null();
+        } else {
+            let metadata = VariantMetadata::new(metadata.value(i));
+            let mut value_builder = value_builder.builder_ext(&metadata);
+            row_builder.append_row(&mut value_builder, &metadata, i)?;
+            null_builder.append_non_null();
+        }
+    }
+
+    let value = value_builder.build()?;
+    Ok(VariantArray::from_parts(
+        metadata.clone(),
+        Some(value),
+        None,
+        null_builder.finish(),
+    ))
+}
+
+/// Row builder for converting shredded VariantArray rows back to unshredded 
form
+enum UnshredVariantRowBuilder<'a> {
+    PrimitiveInt8(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Int8Type>>),
+    PrimitiveInt16(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Int16Type>>),
+    PrimitiveInt32(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Int32Type>>),
+    PrimitiveInt64(UnshredPrimitiveRowBuilder<'a, PrimitiveArray<Int64Type>>),
+    PrimitiveFloat32(UnshredPrimitiveRowBuilder<'a, 
PrimitiveArray<Float32Type>>),
+    PrimitiveFloat64(UnshredPrimitiveRowBuilder<'a, 
PrimitiveArray<Float64Type>>),
+    PrimitiveDate32(UnshredPrimitiveRowBuilder<'a, 
PrimitiveArray<Date32Type>>),
+    PrimitiveTime64(UnshredPrimitiveRowBuilder<'a, 
PrimitiveArray<Time64MicrosecondType>>),
+    TimestampMicrosecond(TimestampUnshredRowBuilder<'a, 
TimestampMicrosecondType>),
+    TimestampNanosecond(TimestampUnshredRowBuilder<'a, 
TimestampNanosecondType>),
+    PrimitiveBoolean(UnshredPrimitiveRowBuilder<'a, BooleanArray>),
+    PrimitiveString(UnshredPrimitiveRowBuilder<'a, StringArray>),
+    PrimitiveBinaryView(UnshredPrimitiveRowBuilder<'a, BinaryViewArray>),
+    PrimitiveUuid(UnshredPrimitiveRowBuilder<'a, FixedSizeBinaryArray>),
+    Struct(StructUnshredVariantBuilder<'a>),
+    ValueOnly(ValueOnlyUnshredVariantBuilder<'a>),
+    Null(NullUnshredVariantBuilder<'a>),
+}
+
+impl<'a> UnshredVariantRowBuilder<'a> {
+    /// Creates an all-null row builder.
+    fn null(nulls: Option<&'a NullBuffer>) -> Self {
+        Self::Null(NullUnshredVariantBuilder::new(nulls))
+    }
+
+    /// Appends a single row at the given value index to the supplied builder.
+    fn append_row(
+        &mut self,
+        builder: &mut impl VariantBuilderExt,
+        metadata: &VariantMetadata,
+        index: usize,
+    ) -> Result<()> {
+        match self {
+            Self::PrimitiveInt8(b) => b.append_row(builder, metadata, index),
+            Self::PrimitiveInt16(b) => b.append_row(builder, metadata, index),
+            Self::PrimitiveInt32(b) => b.append_row(builder, metadata, index),
+            Self::PrimitiveInt64(b) => b.append_row(builder, metadata, index),
+            Self::PrimitiveFloat32(b) => b.append_row(builder, metadata, 
index),
+            Self::PrimitiveFloat64(b) => b.append_row(builder, metadata, 
index),
+            Self::PrimitiveDate32(b) => b.append_row(builder, metadata, index),
+            Self::PrimitiveTime64(b) => b.append_row(builder, metadata, index),
+            Self::TimestampMicrosecond(b) => b.append_row(builder, metadata, 
index),
+            Self::TimestampNanosecond(b) => b.append_row(builder, metadata, 
index),
+            Self::PrimitiveBoolean(b) => b.append_row(builder, metadata, 
index),
+            Self::PrimitiveString(b) => b.append_row(builder, metadata, index),
+            Self::PrimitiveBinaryView(b) => b.append_row(builder, metadata, 
index),
+            Self::PrimitiveUuid(b) => b.append_row(builder, metadata, index),
+            Self::Struct(b) => b.append_row(builder, metadata, index),
+            Self::ValueOnly(b) => b.append_row(builder, metadata, index),
+            Self::Null(b) => b.append_row(builder, metadata, index),
+        }
+    }
+}
+
+/// Factory function to create the appropriate row builder for given field 
components
+/// Returns None for None/None case - caller decides how to handle based on 
context
+fn make_unshred_variant_row_builder<'a>(
+    shredding_state: BorrowedShreddingState<'a>,
+) -> Result<Option<UnshredVariantRowBuilder<'a>>> {
+    let value = shredding_state.value_field();
+    let typed_value = shredding_state.typed_value_field();
+    let Some(typed_value) = typed_value else {
+        // Copy the value across directly, if present. Else caller decides 
what to do.
+        return Ok(value
+            .map(|v| 
UnshredVariantRowBuilder::ValueOnly(ValueOnlyUnshredVariantBuilder::new(v))));
+    };
+
+    // Has typed_value -> determine type and create appropriate builder
+    macro_rules! primitive_builder {
+        ($enum_variant:ident, $cast_fn:ident) => {
+            
UnshredVariantRowBuilder::$enum_variant(UnshredPrimitiveRowBuilder::new(
+                value,
+                typed_value.$cast_fn(),
+            ))
+        };
+    }
+
+    let builder = match typed_value.data_type() {
+        DataType::Int8 => primitive_builder!(PrimitiveInt8, as_primitive),
+        DataType::Int16 => primitive_builder!(PrimitiveInt16, as_primitive),
+        DataType::Int32 => primitive_builder!(PrimitiveInt32, as_primitive),
+        DataType::Int64 => primitive_builder!(PrimitiveInt64, as_primitive),
+        DataType::Float32 => primitive_builder!(PrimitiveFloat32, 
as_primitive),
+        DataType::Float64 => primitive_builder!(PrimitiveFloat64, 
as_primitive),
+        DataType::Date32 => primitive_builder!(PrimitiveDate32, as_primitive),
+        DataType::Time64(TimeUnit::Microsecond) => {
+            primitive_builder!(PrimitiveTime64, as_primitive)
+        }
+        DataType::Time64(time_unit) => {
+            return Err(ArrowError::InvalidArgumentError(format!(
+                "Time64({time_unit}) is not a valid variant shredding type",
+            )));
+        }
+        DataType::Timestamp(TimeUnit::Microsecond, timezone) => {
+            
UnshredVariantRowBuilder::TimestampMicrosecond(TimestampUnshredRowBuilder::new(
+                value,
+                typed_value.as_primitive(),
+                timezone.is_some(),
+            ))
+        }
+        DataType::Timestamp(TimeUnit::Nanosecond, timezone) => {
+            
UnshredVariantRowBuilder::TimestampNanosecond(TimestampUnshredRowBuilder::new(
+                value,
+                typed_value.as_primitive(),
+                timezone.is_some(),
+            ))
+        }
+        DataType::Timestamp(time_unit, _) => {
+            return Err(ArrowError::InvalidArgumentError(format!(
+                "Timestamp({time_unit}) is not a valid variant shredding type",
+            )));
+        }
+        DataType::Boolean => primitive_builder!(PrimitiveBoolean, as_boolean),
+        DataType::Utf8 => primitive_builder!(PrimitiveString, as_string),
+        DataType::BinaryView => primitive_builder!(PrimitiveBinaryView, 
as_binary_view),
+        DataType::FixedSizeBinary(16) => primitive_builder!(PrimitiveUuid, 
as_fixed_size_binary),
+        DataType::FixedSizeBinary(size) => {
+            return Err(ArrowError::InvalidArgumentError(format!(
+                "FixedSizeBinary({size}) is not a valid variant shredding 
type",
+            )));
+        }
+        DataType::Struct(_) => UnshredVariantRowBuilder::Struct(
+            StructUnshredVariantBuilder::try_new(value, 
typed_value.as_struct())?,
+        ),
+        _ => {
+            return Err(ArrowError::NotYetImplemented(format!(
+                "Unshredding not yet supported for type: {}",
+                typed_value.data_type()
+            )));
+        }
+    };
+    Ok(Some(builder))
+}
+
+/// Builder for arrays with neither typed_value nor value (all 
NULL/Variant::Null)
+struct NullUnshredVariantBuilder<'a> {
+    nulls: Option<&'a NullBuffer>,
+}
+
+impl<'a> NullUnshredVariantBuilder<'a> {
+    fn new(nulls: Option<&'a NullBuffer>) -> Self {
+        Self { nulls }
+    }
+
+    fn append_row(
+        &mut self,
+        builder: &mut impl VariantBuilderExt,
+        _metadata: &VariantMetadata,
+        index: usize,
+    ) -> Result<()> {
+        if self.nulls.is_some_and(|nulls| nulls.is_null(index)) {
+            builder.append_null();
+        } else {
+            builder.append_value(Variant::Null);
+        }
+        Ok(())
+    }
+}
+
+/// Builder for arrays that only have value column (already unshredded)
+struct ValueOnlyUnshredVariantBuilder<'a> {
+    value: &'a arrow::array::BinaryViewArray,
+}
+
+impl<'a> ValueOnlyUnshredVariantBuilder<'a> {
+    fn new(value: &'a BinaryViewArray) -> Self {
+        Self { value }
+    }
+
+    fn append_row(
+        &mut self,
+        builder: &mut impl VariantBuilderExt,
+        metadata: &VariantMetadata,
+        index: usize,
+    ) -> Result<()> {
+        if self.value.is_null(index) {
+            builder.append_null();
+        } else {
+            let variant = Variant::new_with_metadata(metadata.clone(), 
self.value.value(index));
+            builder.append_value(variant);
+        }
+        Ok(())
+    }
+}
+
+/// Extension trait that directly adds row builder support for arrays that 
correspond to primitive
+/// variant types.
+trait PrimitiveVariantUnshredExt {
+    fn append_to_variant_builder(
+        &self,
+        builder: &mut impl VariantBuilderExt,
+        index: usize,
+    ) -> Result<()>;
+}
+
+/// Macro that handles the common unshredded case and returns early if handled.
+/// If not handled (shredded case), validates and returns the extracted value.
+macro_rules! handle_unshredded_case {
+    ($self:expr, $builder:expr, $metadata:expr, $index:expr, 
$partial_shredding:expr) => {{
+        let value = $self.value.as_ref().filter(|v| v.is_valid($index));
+        let value = value.map(|v| 
Variant::new_with_metadata($metadata.clone(), v.value($index)));
+
+        // If typed_value is null, handle unshredded case and return early
+        if $self.typed_value.is_null($index) {
+            match value {
+                Some(value) => $builder.append_value(value),
+                None => $builder.append_null(),
+            }
+            return Ok(());
+        }
+
+        // Only partial shredding allows value and typed_value to both be 
non-NULL
+        if !$partial_shredding && value.is_some() {
+            return Err(ArrowError::InvalidArgumentError(
+                "Invalid shredded variant: both value and typed_value are 
non-null".to_string(),
+            ));
+        }
+
+        // Return the extracted value for the partial shredded case
+        value
+    }};
+}
+
+/// Generic unshred builder that works with any typed array implementing 
VariantUnshredExt
+struct UnshredPrimitiveRowBuilder<'a, T> {
+    value: Option<&'a BinaryViewArray>,
+    typed_value: &'a T,
+}
+
+impl<'a, T: Array + PrimitiveVariantUnshredExt> UnshredPrimitiveRowBuilder<'a, 
T> {
+    fn new(value: Option<&'a BinaryViewArray>, typed_value: &'a T) -> Self {
+        Self { value, typed_value }
+    }
+
+    fn append_row(
+        &mut self,
+        builder: &mut impl VariantBuilderExt,
+        metadata: &VariantMetadata,
+        index: usize,
+    ) -> Result<()> {
+        handle_unshredded_case!(self, builder, metadata, index, false);
+
+        // If we get here, typed_value is valid and value is NULL
+        self.typed_value.append_to_variant_builder(builder, index)
+    }
+}
+
+// Macro to generate VariantUnshredExt implementations with optional value 
transformation
+macro_rules! impl_variant_unshred {
+    ($array_type:ty $(, |$v:ident| $transform:expr)? ) => {
+        impl PrimitiveVariantUnshredExt for $array_type {
+            fn append_to_variant_builder(
+                &self,
+                builder: &mut impl VariantBuilderExt,
+                index: usize,
+            ) -> Result<()> {
+                let value = self.value(index);
+                $(
+                    let $v = value;
+                    let value = $transform;
+                )?
+                builder.append_value(value);
+                Ok(())
+            }
+        }
+    };
+}
+
+impl_variant_unshred!(BooleanArray);
+impl_variant_unshred!(StringArray);
+impl_variant_unshred!(BinaryViewArray);
+impl_variant_unshred!(PrimitiveArray<Int8Type>);
+impl_variant_unshred!(PrimitiveArray<Int16Type>);
+impl_variant_unshred!(PrimitiveArray<Int32Type>);
+impl_variant_unshred!(PrimitiveArray<Int64Type>);
+impl_variant_unshred!(PrimitiveArray<Float32Type>);
+impl_variant_unshred!(PrimitiveArray<Float64Type>);
+
+impl_variant_unshred!(PrimitiveArray<Date32Type>, |days_since_epoch| {
+    Date32Type::to_naive_date(days_since_epoch)
+});
+
+impl_variant_unshred!(
+    PrimitiveArray<Time64MicrosecondType>,
+    |micros_since_midnight| {
+        time64us_to_time(micros_since_midnight).ok_or_else(|| {
+            ArrowError::InvalidArgumentError(format!(
+                "Invalid Time64 microsecond value: {micros_since_midnight}"
+            ))
+        })?
+    }
+);
+
+// UUID from FixedSizeBinary(16)
+// NOTE: FixedSizeBinaryArray guarantees the byte length, so we can safely 
unwrap
+impl_variant_unshred!(FixedSizeBinaryArray, |bytes| {
+    Uuid::from_slice(bytes).unwrap()
+});
+
+/// Trait for timestamp types to handle conversion to `DateTime<Utc>`
+trait TimestampType: ArrowPrimitiveType<Native = i64> {
+    fn to_datetime_utc(value: i64) -> Result<DateTime<Utc>>;
+}
+
+impl TimestampType for TimestampMicrosecondType {
+    fn to_datetime_utc(micros: i64) -> Result<DateTime<Utc>> {
+        DateTime::from_timestamp_micros(micros).ok_or_else(|| {
+            ArrowError::InvalidArgumentError(format!(
+                "Invalid timestamp microsecond value: {micros}"
+            ))
+        })
+    }
+}
+
+impl TimestampType for TimestampNanosecondType {
+    fn to_datetime_utc(nanos: i64) -> Result<DateTime<Utc>> {
+        Ok(DateTime::from_timestamp_nanos(nanos))
+    }
+}
+
+/// Generic builder for timestamp types that handles timezone-aware conversion
+struct TimestampUnshredRowBuilder<'a, T: TimestampType> {
+    value: Option<&'a BinaryViewArray>,
+    typed_value: &'a PrimitiveArray<T>,
+    has_timezone: bool,
+}
+
+impl<'a, T: TimestampType> TimestampUnshredRowBuilder<'a, T> {
+    fn new(
+        value: Option<&'a BinaryViewArray>,
+        typed_value: &'a PrimitiveArray<T>,
+        has_timezone: bool,
+    ) -> Self {
+        Self {
+            value,
+            typed_value,
+            has_timezone,
+        }
+    }
+
+    fn append_row(
+        &mut self,
+        builder: &mut impl VariantBuilderExt,
+        metadata: &VariantMetadata,
+        index: usize,
+    ) -> Result<()> {
+        handle_unshredded_case!(self, builder, metadata, index, false);
+
+        // If we get here, typed_value is valid and value is NULL
+        let timestamp_value = self.typed_value.value(index);
+        let dt = T::to_datetime_utc(timestamp_value)?;
+        if self.has_timezone {
+            builder.append_value(dt);
+        } else {
+            builder.append_value(dt.naive_utc());
+        }
+        Ok(())
+    }
+}
+
+/// Builder for unshredding struct/object types with nested fields
+struct StructUnshredVariantBuilder<'a> {
+    value: Option<&'a arrow::array::BinaryViewArray>,
+    typed_value: &'a arrow::array::StructArray,
+    field_unshredders: IndexMap<&'a str, Option<UnshredVariantRowBuilder<'a>>>,
+}
+
+impl<'a> StructUnshredVariantBuilder<'a> {
+    fn try_new(value: Option<&'a BinaryViewArray>, typed_value: &'a 
StructArray) -> Result<Self> {
+        // Create unshredders for each field in constructor
+        let mut field_unshredders = IndexMap::new();
+        for (field, field_array) in 
typed_value.fields().iter().zip(typed_value.columns()) {
+            // Factory returns None for None/None case -- these are missing 
fields we should skip
+            let Some(field_array) = field_array.as_struct_opt() else {
+                return Err(ArrowError::InvalidArgumentError(format!(
+                    "Invalid shredded variant object field: expected Struct, 
got {}",
+                    field_array.data_type()
+                )));
+            };
+            let field_unshredder = 
make_unshred_variant_row_builder(field_array.try_into()?)?;
+            field_unshredders.insert(field.name().as_ref(), field_unshredder);
+        }
+
+        Ok(Self {
+            value,
+            typed_value,
+            field_unshredders,
+        })
+    }
+
+    fn append_row(
+        &mut self,
+        builder: &mut impl VariantBuilderExt,
+        metadata: &VariantMetadata,
+        index: usize,
+    ) -> Result<()> {
+        let value = handle_unshredded_case!(self, builder, metadata, index, 
true);
+
+        // If we get here, typed_value is valid and value may or may not be 
valid
+        let mut object_builder = builder.try_new_object()?;
+
+        // Process typed fields (skip empty builders that indicate missing 
fields)
+        for (field_name, field_unshredder_opt) in &mut self.field_unshredders {
+            if let Some(field_unshredder) = field_unshredder_opt {
+                let mut field_builder = ObjectFieldBuilder::new(field_name, 
&mut object_builder);
+                field_unshredder.append_row(&mut field_builder, metadata, 
index)?;
+            }
+        }
+
+        // Process any unshredded fields (partial shredding)
+        if let Some(value) = value {
+            let Variant::Object(object) = value else {
+                return Err(ArrowError::InvalidArgumentError(
+                    "Expected object in value field for partially shredded 
struct".to_string(),
+                ));
+            };
+
+            for (field_name, field_value) in object.iter() {
+                if self.field_unshredders.contains_key(field_name) {
+                    return Err(ArrowError::InvalidArgumentError(format!(
+                        "Field '{field_name}' appears in both typed_value and 
value",
+                    )));
+                }
+                object_builder.insert_bytes(field_name, field_value);
+            }
+        }
+
+        object_builder.finish();
+        Ok(())
+    }
+}

Review Comment:
   I was wondering if we should have any tests for this kernel directly
   
   I do realize the tests in parquet/tests/variant_integration.rs cover it 
pretty thoroughly
   
   Maybe we can leave a comment to that effect
   
   ```suggestion
   }
   // Note: code is covered by tests in `parquet/tests/variant_integration.rs`
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to