This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new dc9c3cdd2c [Variant] Implement row builders for cast_to_variant (#8299)
dc9c3cdd2c is described below

commit dc9c3cdd2c89a01d8f71c5b9a8ad1d4d9e9dc772
Author: Ryan Johnson <scov...@users.noreply.github.com>
AuthorDate: Wed Sep 10 12:00:02 2025 -0700

    [Variant] Implement row builders for cast_to_variant (#8299)
    
    Note to reviewers: This PR includes 1600+ LoC of new unit tests. The
    actual changes are half that big.
    
    # Which issue does this PR close?
    
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax.
    
    - Closes https://github.com/apache/arrow-rs/issues/8310
    
    # Rationale for this change
    
    The original `cast_to_variant` code did columnar conversions of types to
    variant. For primitive types this worked ok, but for deeply nested types
    it means repeatedly creating new variants (and variant metadata), only
    to re-code them by copying the variant values to new arrays (with new
    metadata and field ids). Very expensive.
    
    # What changes are included in this PR?
    
    Follow the example of https://github.com/apache/arrow-rs/pull/8280, and
    introduce a row builder concept that takes individual array values and
    writes them to an `impl VariantBuilderExt`. Row builders for complex
    types instantiate the appropriate list or object builder to pass to
    their children.
    
    # Are these changes tested?
    
    Existing unit tests continue to pass. Extensive new unit tests added as
    well.
    
    # Are there any user-facing changes?
    
    * `VariantBuilderExt` has a new `append_null` method.
    * `ObjectFieldBuilder` moved to `builder.rs` and made public
---
 parquet-variant-compute/src/arrow_to_variant.rs    | 1995 ++++++++++++++++++++
 parquet-variant-compute/src/cast_to_variant.rs     |  694 +------
 parquet-variant-compute/src/lib.rs                 |    4 +-
 parquet-variant-compute/src/type_conversion.rs     |  104 +-
 .../src/variant_array_builder.rs                   |   24 +-
 parquet-variant-json/src/from_json.rs              |   26 +-
 parquet-variant/src/builder.rs                     |   41 +
 7 files changed, 2151 insertions(+), 737 deletions(-)

diff --git a/parquet-variant-compute/src/arrow_to_variant.rs 
b/parquet-variant-compute/src/arrow_to_variant.rs
new file mode 100644
index 0000000000..c08990de69
--- /dev/null
+++ b/parquet-variant-compute/src/arrow_to_variant.rs
@@ -0,0 +1,1995 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+
+use crate::type_conversion::{decimal_to_variant_decimal, CastOptions};
+use arrow::array::{
+    Array, AsArray, GenericBinaryArray, GenericStringArray, OffsetSizeTrait, 
PrimitiveArray,
+};
+use arrow::compute::kernels::cast;
+use arrow::datatypes::{
+    ArrowNativeType, ArrowPrimitiveType, ArrowTemporalType, 
ArrowTimestampType, Date32Type,
+    Date64Type, Float16Type, Float32Type, Float64Type, Int16Type, Int32Type, 
Int64Type, Int8Type,
+    RunEndIndexType, Time32MillisecondType, Time32SecondType, 
Time64MicrosecondType,
+    Time64NanosecondType, TimestampMicrosecondType, TimestampMillisecondType,
+    TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type, 
UInt64Type, UInt8Type,
+};
+use arrow::temporal_conversions::{as_date, as_datetime, as_time};
+use arrow_schema::{ArrowError, DataType, TimeUnit};
+use chrono::{DateTime, TimeZone, Utc};
+use parquet_variant::{
+    ObjectFieldBuilder, Variant, VariantBuilderExt, VariantDecimal16, 
VariantDecimal4,
+    VariantDecimal8,
+};
+
+// ============================================================================
+// Row-oriented builders for efficient Arrow-to-Variant conversion
+// ============================================================================
+
+/// Row builder for converting Arrow arrays to VariantArray row by row
+pub(crate) enum ArrowToVariantRowBuilder<'a> {
+    Null(NullArrowToVariantBuilder),
+    Boolean(BooleanArrowToVariantBuilder<'a>),
+    PrimitiveInt8(PrimitiveArrowToVariantBuilder<'a, Int8Type>),
+    PrimitiveInt16(PrimitiveArrowToVariantBuilder<'a, Int16Type>),
+    PrimitiveInt32(PrimitiveArrowToVariantBuilder<'a, Int32Type>),
+    PrimitiveInt64(PrimitiveArrowToVariantBuilder<'a, Int64Type>),
+    PrimitiveUInt8(PrimitiveArrowToVariantBuilder<'a, UInt8Type>),
+    PrimitiveUInt16(PrimitiveArrowToVariantBuilder<'a, UInt16Type>),
+    PrimitiveUInt32(PrimitiveArrowToVariantBuilder<'a, UInt32Type>),
+    PrimitiveUInt64(PrimitiveArrowToVariantBuilder<'a, UInt64Type>),
+    PrimitiveFloat16(PrimitiveArrowToVariantBuilder<'a, Float16Type>),
+    PrimitiveFloat32(PrimitiveArrowToVariantBuilder<'a, Float32Type>),
+    PrimitiveFloat64(PrimitiveArrowToVariantBuilder<'a, Float64Type>),
+    Decimal32(Decimal32ArrowToVariantBuilder<'a>),
+    Decimal64(Decimal64ArrowToVariantBuilder<'a>),
+    Decimal128(Decimal128ArrowToVariantBuilder<'a>),
+    Decimal256(Decimal256ArrowToVariantBuilder<'a>),
+    TimestampSecond(TimestampArrowToVariantBuilder<'a, TimestampSecondType>),
+    TimestampMillisecond(TimestampArrowToVariantBuilder<'a, 
TimestampMillisecondType>),
+    TimestampMicrosecond(TimestampArrowToVariantBuilder<'a, 
TimestampMicrosecondType>),
+    TimestampNanosecond(TimestampArrowToVariantBuilder<'a, 
TimestampNanosecondType>),
+    Date32(DateArrowToVariantBuilder<'a, Date32Type>),
+    Date64(DateArrowToVariantBuilder<'a, Date64Type>),
+    Time32Second(TimeArrowToVariantBuilder<'a, Time32SecondType>),
+    Time32Millisecond(TimeArrowToVariantBuilder<'a, Time32MillisecondType>),
+    Time64Microsecond(TimeArrowToVariantBuilder<'a, Time64MicrosecondType>),
+    Time64Nanosecond(TimeArrowToVariantBuilder<'a, Time64NanosecondType>),
+    Binary(BinaryArrowToVariantBuilder<'a, i32>),
+    LargeBinary(BinaryArrowToVariantBuilder<'a, i64>),
+    BinaryView(BinaryViewArrowToVariantBuilder<'a>),
+    FixedSizeBinary(FixedSizeBinaryArrowToVariantBuilder<'a>),
+    Utf8(StringArrowToVariantBuilder<'a, i32>),
+    LargeUtf8(StringArrowToVariantBuilder<'a, i64>),
+    Utf8View(StringViewArrowToVariantBuilder<'a>),
+    List(ListArrowToVariantBuilder<'a, i32>),
+    LargeList(ListArrowToVariantBuilder<'a, i64>),
+    Struct(StructArrowToVariantBuilder<'a>),
+    Map(MapArrowToVariantBuilder<'a>),
+    Union(UnionArrowToVariantBuilder<'a>),
+    Dictionary(DictionaryArrowToVariantBuilder<'a>),
+    RunEndEncodedInt16(RunEndEncodedArrowToVariantBuilder<'a, Int16Type>),
+    RunEndEncodedInt32(RunEndEncodedArrowToVariantBuilder<'a, Int32Type>),
+    RunEndEncodedInt64(RunEndEncodedArrowToVariantBuilder<'a, Int64Type>),
+}
+
+impl<'a> ArrowToVariantRowBuilder<'a> {
+    /// Appends a single row at the given index to the supplied builder.
+    pub fn append_row(
+        &mut self,
+        builder: &mut impl VariantBuilderExt,
+        index: usize,
+    ) -> Result<(), ArrowError> {
+        use ArrowToVariantRowBuilder::*;
+        match self {
+            Null(b) => b.append_row(builder, index),
+            Boolean(b) => b.append_row(builder, index),
+            PrimitiveInt8(b) => b.append_row(builder, index),
+            PrimitiveInt16(b) => b.append_row(builder, index),
+            PrimitiveInt32(b) => b.append_row(builder, index),
+            PrimitiveInt64(b) => b.append_row(builder, index),
+            PrimitiveUInt8(b) => b.append_row(builder, index),
+            PrimitiveUInt16(b) => b.append_row(builder, index),
+            PrimitiveUInt32(b) => b.append_row(builder, index),
+            PrimitiveUInt64(b) => b.append_row(builder, index),
+            PrimitiveFloat16(b) => b.append_row(builder, index),
+            PrimitiveFloat32(b) => b.append_row(builder, index),
+            PrimitiveFloat64(b) => b.append_row(builder, index),
+            Decimal32(b) => b.append_row(builder, index),
+            Decimal64(b) => b.append_row(builder, index),
+            Decimal128(b) => b.append_row(builder, index),
+            Decimal256(b) => b.append_row(builder, index),
+            TimestampSecond(b) => b.append_row(builder, index),
+            TimestampMillisecond(b) => b.append_row(builder, index),
+            TimestampMicrosecond(b) => b.append_row(builder, index),
+            TimestampNanosecond(b) => b.append_row(builder, index),
+            Date32(b) => b.append_row(builder, index),
+            Date64(b) => b.append_row(builder, index),
+            Time32Second(b) => b.append_row(builder, index),
+            Time32Millisecond(b) => b.append_row(builder, index),
+            Time64Microsecond(b) => b.append_row(builder, index),
+            Time64Nanosecond(b) => b.append_row(builder, index),
+            Binary(b) => b.append_row(builder, index),
+            LargeBinary(b) => b.append_row(builder, index),
+            BinaryView(b) => b.append_row(builder, index),
+            FixedSizeBinary(b) => b.append_row(builder, index),
+            Utf8(b) => b.append_row(builder, index),
+            LargeUtf8(b) => b.append_row(builder, index),
+            Utf8View(b) => b.append_row(builder, index),
+            List(b) => b.append_row(builder, index),
+            LargeList(b) => b.append_row(builder, index),
+            Struct(b) => b.append_row(builder, index),
+            Map(b) => b.append_row(builder, index),
+            Union(b) => b.append_row(builder, index),
+            Dictionary(b) => b.append_row(builder, index),
+            RunEndEncodedInt16(b) => b.append_row(builder, index),
+            RunEndEncodedInt32(b) => b.append_row(builder, index),
+            RunEndEncodedInt64(b) => b.append_row(builder, index),
+        }
+    }
+}
+
+/// Factory function to create the appropriate row builder for a given DataType
+pub(crate) fn make_arrow_to_variant_row_builder<'a>(
+    data_type: &'a DataType,
+    array: &'a dyn Array,
+    options: &'a CastOptions,
+) -> Result<ArrowToVariantRowBuilder<'a>, ArrowError> {
+    use ArrowToVariantRowBuilder::*;
+    let builder =
+        match data_type {
+            DataType::Null => Null(NullArrowToVariantBuilder),
+            DataType::Boolean => 
Boolean(BooleanArrowToVariantBuilder::new(array)),
+            DataType::Int8 => 
PrimitiveInt8(PrimitiveArrowToVariantBuilder::new(array)),
+            DataType::Int16 => 
PrimitiveInt16(PrimitiveArrowToVariantBuilder::new(array)),
+            DataType::Int32 => 
PrimitiveInt32(PrimitiveArrowToVariantBuilder::new(array)),
+            DataType::Int64 => 
PrimitiveInt64(PrimitiveArrowToVariantBuilder::new(array)),
+            DataType::UInt8 => 
PrimitiveUInt8(PrimitiveArrowToVariantBuilder::new(array)),
+            DataType::UInt16 => 
PrimitiveUInt16(PrimitiveArrowToVariantBuilder::new(array)),
+            DataType::UInt32 => 
PrimitiveUInt32(PrimitiveArrowToVariantBuilder::new(array)),
+            DataType::UInt64 => 
PrimitiveUInt64(PrimitiveArrowToVariantBuilder::new(array)),
+            DataType::Float16 => 
PrimitiveFloat16(PrimitiveArrowToVariantBuilder::new(array)),
+            DataType::Float32 => 
PrimitiveFloat32(PrimitiveArrowToVariantBuilder::new(array)),
+            DataType::Float64 => 
PrimitiveFloat64(PrimitiveArrowToVariantBuilder::new(array)),
+            DataType::Decimal32(_, scale) => {
+                Decimal32(Decimal32ArrowToVariantBuilder::new(array, *scale))
+            }
+            DataType::Decimal64(_, scale) => {
+                Decimal64(Decimal64ArrowToVariantBuilder::new(array, *scale))
+            }
+            DataType::Decimal128(_, scale) => {
+                Decimal128(Decimal128ArrowToVariantBuilder::new(array, *scale))
+            }
+            DataType::Decimal256(_, scale) => {
+                Decimal256(Decimal256ArrowToVariantBuilder::new(array, *scale))
+            }
+            DataType::Timestamp(time_unit, time_zone) => {
+                match time_unit {
+                    TimeUnit::Second => 
TimestampSecond(TimestampArrowToVariantBuilder::new(
+                        array,
+                        options,
+                        time_zone.is_some(),
+                    )),
+                    TimeUnit::Millisecond => TimestampMillisecond(
+                        TimestampArrowToVariantBuilder::new(array, options, 
time_zone.is_some()),
+                    ),
+                    TimeUnit::Microsecond => TimestampMicrosecond(
+                        TimestampArrowToVariantBuilder::new(array, options, 
time_zone.is_some()),
+                    ),
+                    TimeUnit::Nanosecond => TimestampNanosecond(
+                        TimestampArrowToVariantBuilder::new(array, options, 
time_zone.is_some()),
+                    ),
+                }
+            }
+            DataType::Date32 => Date32(DateArrowToVariantBuilder::new(array, 
options)),
+            DataType::Date64 => Date64(DateArrowToVariantBuilder::new(array, 
options)),
+            DataType::Time32(time_unit) => match time_unit {
+                TimeUnit::Second => 
Time32Second(TimeArrowToVariantBuilder::new(array, options)),
+                TimeUnit::Millisecond => {
+                    Time32Millisecond(TimeArrowToVariantBuilder::new(array, 
options))
+                }
+                _ => {
+                    return Err(ArrowError::CastError(format!(
+                        "Unsupported Time32 unit: {time_unit:?}"
+                    )))
+                }
+            },
+            DataType::Time64(time_unit) => match time_unit {
+                TimeUnit::Microsecond => {
+                    Time64Microsecond(TimeArrowToVariantBuilder::new(array, 
options))
+                }
+                TimeUnit::Nanosecond => {
+                    Time64Nanosecond(TimeArrowToVariantBuilder::new(array, 
options))
+                }
+                _ => {
+                    return Err(ArrowError::CastError(format!(
+                        "Unsupported Time64 unit: {time_unit:?}"
+                    )))
+                }
+            },
+            DataType::Duration(_) | DataType::Interval(_) => {
+                return Err(ArrowError::InvalidArgumentError(
+                    "Casting duration/interval types to Variant is not 
supported. \
+                 The Variant format does not define duration/interval types."
+                        .to_string(),
+                ))
+            }
+            DataType::Binary => 
Binary(BinaryArrowToVariantBuilder::new(array)),
+            DataType::LargeBinary => 
LargeBinary(BinaryArrowToVariantBuilder::new(array)),
+            DataType::BinaryView => 
BinaryView(BinaryViewArrowToVariantBuilder::new(array)),
+            DataType::FixedSizeBinary(_) => {
+                
FixedSizeBinary(FixedSizeBinaryArrowToVariantBuilder::new(array))
+            }
+            DataType::Utf8 => Utf8(StringArrowToVariantBuilder::new(array)),
+            DataType::LargeUtf8 => 
LargeUtf8(StringArrowToVariantBuilder::new(array)),
+            DataType::Utf8View => 
Utf8View(StringViewArrowToVariantBuilder::new(array)),
+            DataType::List(_) => List(ListArrowToVariantBuilder::new(array, 
options)?),
+            DataType::LargeList(_) => 
LargeList(ListArrowToVariantBuilder::new(array, options)?),
+            DataType::Struct(_) => Struct(StructArrowToVariantBuilder::new(
+                array.as_struct(),
+                options,
+            )?),
+            DataType::Map(_, _) => Map(MapArrowToVariantBuilder::new(array, 
options)?),
+            DataType::Union(_, _) => 
Union(UnionArrowToVariantBuilder::new(array, options)?),
+            DataType::Dictionary(_, _) => {
+                Dictionary(DictionaryArrowToVariantBuilder::new(array, 
options)?)
+            }
+            DataType::RunEndEncoded(run_ends, _) => match run_ends.data_type() 
{
+                DataType::Int16 => {
+                    
RunEndEncodedInt16(RunEndEncodedArrowToVariantBuilder::new(array, options)?)
+                }
+                DataType::Int32 => {
+                    
RunEndEncodedInt32(RunEndEncodedArrowToVariantBuilder::new(array, options)?)
+                }
+                DataType::Int64 => {
+                    
RunEndEncodedInt64(RunEndEncodedArrowToVariantBuilder::new(array, options)?)
+                }
+                _ => {
+                    return Err(ArrowError::CastError(format!(
+                        "Unsupported run ends type: {:?}",
+                        run_ends.data_type()
+                    )));
+                }
+            },
+            dt => {
+                return Err(ArrowError::CastError(format!(
+                    "Unsupported data type for casting to Variant: {dt:?}",
+                )));
+            }
+        };
+    Ok(builder)
+}
+
+/// Macro to define (possibly generic) row builders with consistent structure 
and behavior.
+///
+/// The macro optionally allows to define a transform for values read from the 
underlying
+/// array. Transforms of the form `|value| { ... }` are infallible (and should 
produce something
+/// that implements `Into<Variant>`), while transforms of the form `|value| -> 
Option<_> { ... }`
+/// are fallible (and should produce `Option<impl Into<Variant>>`); a failed 
tarnsform will either
+/// append null to the builder or return an error, depending on cast options.
+///
+/// Also supports optional extra fields that are passed to the constructor and 
which are available
+/// by reference in the value transform. Providing a fallible value transform 
requires also
+/// providing the extra field `options: &'a CastOptions`.
+// TODO: If/when the macro_metavar_expr feature stabilizes, the `ignore` 
meta-function would allow
+// us to "use" captured tokens without emitting them:
+//
+// ```
+// $(
+//     ${ignore($value)}
+//     $(
+//         ${ignore($option_ty)}
+//         options: &$lifetime CastOptions,
+//     )?
+// )?
+// ```
+//
+// That, in turn, would allow us to inject the `options` field whenever the 
user specifies a
+// fallible value transform, instead of requiring them to manually define it. 
This might not be
+// worth the trouble, tho, because it makes for some pretty bulky and unwieldy 
macro expansions.
+macro_rules! define_row_builder {
+    (
+        struct $name:ident<$lifetime:lifetime $(, $generic:ident: $bound:path 
)?>
+        $( where $where_path:path: $where_bound:path $(,)? )?
+        $({ $($field:ident: $field_type:ty),+ $(,)? })?,
+        |$array_param:ident| -> $array_type:ty { $init_expr:expr }
+        $(, |$value:ident| $(-> Option<$option_ty:ty>)? $value_transform:expr)?
+    ) => {
+        pub(crate) struct $name<$lifetime $(, $generic: $bound )?>
+        $( where $where_path: $where_bound )?
+        {
+            array: &$lifetime $array_type,
+            $( $( $field: $field_type, )+ )?
+        }
+
+        impl<$lifetime $(, $generic: $bound+ )?> $name<$lifetime $(, 
$generic)?>
+        $( where $where_path: $where_bound )?
+        {
+            pub(crate) fn new($array_param: &$lifetime dyn Array $(, $( 
$field: $field_type ),+ )?) -> Self {
+                Self {
+                    array: $init_expr,
+                    $( $( $field, )+ )?
+                }
+            }
+
+            fn append_row(&self, builder: &mut impl VariantBuilderExt, index: 
usize) -> Result<(), ArrowError> {
+                if self.array.is_null(index) {
+                    builder.append_null();
+                } else {
+                    // Macro hygiene: Give any extra fields names the value 
transform can access.
+                    //
+                    // The value transform doesn't normally reference cast 
options, but the macro's
+                    // caller still has to declare the field because stable 
rust has no way to "use"
+                    // a captured token without emitting it. So, silence 
unused variable warnings,
+                    // assuming that's the `options` field. Unfortunately, 
that also silences
+                    // legitimate compiler warnings if an infallible value 
transform fails to use
+                    // its first extra field.
+                    $(
+                        #[allow(unused)]
+                        $( let $field = &self.$field; )+
+                    )?
+
+                    // Apply the value transform, if any (with name swapping 
for hygiene)
+                    let value = self.array.value(index);
+                    $(
+                        let $value = value;
+                        let value = $value_transform;
+                        $(
+                            // NOTE: The `?` macro expansion fails without the 
type annotation.
+                            let Some(value): Option<$option_ty> = value else {
+                                if self.options.strict {
+                                    return 
Err(ArrowError::ComputeError(format!(
+                                        "Failed to convert value at index 
{index}: conversion failed",
+                                    )));
+                                } else {
+                                    builder.append_null();
+                                    return Ok(());
+                                }
+                            };
+                        )?
+                    )?
+                    builder.append_value(value);
+                }
+                Ok(())
+            }
+        }
+    };
+}
+
+define_row_builder!(
+    struct BooleanArrowToVariantBuilder<'a>,
+    |array| -> arrow::array::BooleanArray { array.as_boolean() }
+);
+
+define_row_builder!(
+    struct PrimitiveArrowToVariantBuilder<'a, T: ArrowPrimitiveType>
+    where T::Native: Into<Variant<'a, 'a>>,
+    |array| -> PrimitiveArray<T> { array.as_primitive() }
+);
+
+define_row_builder!(
+    struct Decimal32ArrowToVariantBuilder<'a> {
+        scale: i8,
+    },
+    |array| -> arrow::array::Decimal32Array { array.as_primitive() },
+    |value| decimal_to_variant_decimal!(value, scale, i32, VariantDecimal4)
+);
+
+define_row_builder!(
+    struct Decimal64ArrowToVariantBuilder<'a> {
+        scale: i8,
+    },
+    |array| -> arrow::array::Decimal64Array { array.as_primitive() },
+    |value| decimal_to_variant_decimal!(value, scale, i64, VariantDecimal8)
+);
+
+define_row_builder!(
+    struct Decimal128ArrowToVariantBuilder<'a> {
+        scale: i8,
+    },
+    |array| -> arrow::array::Decimal128Array { array.as_primitive() },
+    |value| decimal_to_variant_decimal!(value, scale, i128, VariantDecimal16)
+);
+
+define_row_builder!(
+    struct Decimal256ArrowToVariantBuilder<'a> {
+        scale: i8,
+    },
+    |array| -> arrow::array::Decimal256Array { array.as_primitive() },
+    |value| {
+        // Decimal256 needs special handling - convert to i128 if possible
+        match value.to_i128() {
+            Some(i128_val) => decimal_to_variant_decimal!(i128_val, scale, 
i128, VariantDecimal16),
+            None => Variant::Null, // Value too large for i128
+        }
+    }
+);
+
+define_row_builder!(
+    struct TimestampArrowToVariantBuilder<'a, T: ArrowTimestampType> {
+        options: &'a CastOptions,
+        has_time_zone: bool,
+    },
+    |array| -> arrow::array::PrimitiveArray<T> { array.as_primitive() },
+    |value| -> Option<_> {
+        // Convert using Arrow's temporal conversion functions
+        as_datetime::<T>(value).map(|naive_datetime| {
+            if *has_time_zone {
+                // Has timezone -> DateTime<Utc> -> 
TimestampMicros/TimestampNanos
+                let utc_dt: DateTime<Utc> = 
Utc.from_utc_datetime(&naive_datetime);
+                Variant::from(utc_dt) // Uses From<DateTime<Utc>> for Variant
+            } else {
+                // No timezone -> NaiveDateTime -> 
TimestampNtzMicros/TimestampNtzNanos
+                Variant::from(naive_datetime) // Uses From<NaiveDateTime> for 
Variant
+            }
+        })
+    }
+);
+
+define_row_builder!(
+    struct DateArrowToVariantBuilder<'a, T: ArrowTemporalType>
+    where
+        i64: From<T::Native>,
+    {
+        options: &'a CastOptions,
+    },
+    |array| -> PrimitiveArray<T> { array.as_primitive() },
+    |value| -> Option<_> {
+        let date_value = i64::from(value);
+        as_date::<T>(date_value)
+    }
+);
+
+define_row_builder!(
+    struct TimeArrowToVariantBuilder<'a, T: ArrowTemporalType>
+    where
+        i64: From<T::Native>,
+    {
+        options: &'a CastOptions,
+    },
+    |array| -> PrimitiveArray<T> { array.as_primitive() },
+    |value| -> Option<_> {
+        let time_value = i64::from(value);
+        as_time::<T>(time_value)
+    }
+);
+
+define_row_builder!(
+    struct BinaryArrowToVariantBuilder<'a, O: OffsetSizeTrait>,
+    |array| -> GenericBinaryArray<O> { array.as_binary() }
+);
+
+define_row_builder!(
+    struct BinaryViewArrowToVariantBuilder<'a>,
+    |array| -> arrow::array::BinaryViewArray { array.as_byte_view() }
+);
+
+define_row_builder!(
+    struct FixedSizeBinaryArrowToVariantBuilder<'a>,
+    |array| -> arrow::array::FixedSizeBinaryArray { 
array.as_fixed_size_binary() }
+);
+
+define_row_builder!(
+    struct StringArrowToVariantBuilder<'a, O: OffsetSizeTrait>,
+    |array| -> GenericStringArray<O> { array.as_string() }
+);
+
+define_row_builder!(
+    struct StringViewArrowToVariantBuilder<'a>,
+    |array| -> arrow::array::StringViewArray { array.as_string_view() }
+);
+
+/// Null builder that always appends null
+pub(crate) struct NullArrowToVariantBuilder;
+
+impl NullArrowToVariantBuilder {
+    fn append_row(
+        &mut self,
+        builder: &mut impl VariantBuilderExt,
+        _index: usize,
+    ) -> Result<(), ArrowError> {
+        builder.append_null();
+        Ok(())
+    }
+}
+
+/// Generic list builder for List and LargeList types
+pub(crate) struct ListArrowToVariantBuilder<'a, O: OffsetSizeTrait> {
+    list_array: &'a arrow::array::GenericListArray<O>,
+    values_builder: Box<ArrowToVariantRowBuilder<'a>>,
+}
+
+impl<'a, O: OffsetSizeTrait> ListArrowToVariantBuilder<'a, O> {
+    pub(crate) fn new(array: &'a dyn Array, options: &'a CastOptions) -> 
Result<Self, ArrowError> {
+        let list_array = array.as_list();
+        let values = list_array.values();
+        let values_builder =
+            make_arrow_to_variant_row_builder(values.data_type(), 
values.as_ref(), options)?;
+
+        Ok(Self {
+            list_array,
+            values_builder: Box::new(values_builder),
+        })
+    }
+
+    fn append_row(
+        &mut self,
+        builder: &mut impl VariantBuilderExt,
+        index: usize,
+    ) -> Result<(), ArrowError> {
+        if self.list_array.is_null(index) {
+            builder.append_null();
+            return Ok(());
+        }
+
+        let offsets = self.list_array.offsets();
+        let start = offsets[index].as_usize();
+        let end = offsets[index + 1].as_usize();
+
+        let mut list_builder = builder.try_new_list()?;
+        for value_index in start..end {
+            self.values_builder
+                .append_row(&mut list_builder, value_index)?;
+        }
+        list_builder.finish();
+        Ok(())
+    }
+}
+
+/// Struct builder for StructArray
+pub(crate) struct StructArrowToVariantBuilder<'a> {
+    struct_array: &'a arrow::array::StructArray,
+    field_builders: Vec<(&'a str, ArrowToVariantRowBuilder<'a>)>,
+}
+
+impl<'a> StructArrowToVariantBuilder<'a> {
+    pub(crate) fn new(
+        struct_array: &'a arrow::array::StructArray,
+        options: &'a CastOptions,
+    ) -> Result<Self, ArrowError> {
+        let mut field_builders = Vec::new();
+
+        // Create a row builder for each field
+        for (field_name, field_array) in struct_array
+            .column_names()
+            .iter()
+            .zip(struct_array.columns().iter())
+        {
+            let field_builder = make_arrow_to_variant_row_builder(
+                field_array.data_type(),
+                field_array.as_ref(),
+                options,
+            )?;
+            field_builders.push((*field_name, field_builder));
+        }
+
+        Ok(Self {
+            struct_array,
+            field_builders,
+        })
+    }
+
+    fn append_row(
+        &mut self,
+        builder: &mut impl VariantBuilderExt,
+        index: usize,
+    ) -> Result<(), ArrowError> {
+        if self.struct_array.is_null(index) {
+            builder.append_null();
+        } else {
+            // Create object builder for this struct row
+            let mut obj_builder = builder.try_new_object()?;
+
+            // Process each field
+            for (field_name, row_builder) in &mut self.field_builders {
+                let mut field_builder =
+                    parquet_variant::ObjectFieldBuilder::new(field_name, &mut 
obj_builder);
+                row_builder.append_row(&mut field_builder, index)?;
+            }
+
+            obj_builder.finish();
+        }
+        Ok(())
+    }
+}
+
+/// Map builder for MapArray types
+pub(crate) struct MapArrowToVariantBuilder<'a> {
+    map_array: &'a arrow::array::MapArray,
+    key_strings: arrow::array::StringArray,
+    values_builder: Box<ArrowToVariantRowBuilder<'a>>,
+}
+
+impl<'a> MapArrowToVariantBuilder<'a> {
+    pub(crate) fn new(array: &'a dyn Array, options: &'a CastOptions) -> 
Result<Self, ArrowError> {
+        let map_array = array.as_map();
+
+        // Pre-cast keys to strings once
+        let keys = cast(map_array.keys(), &DataType::Utf8)?;
+        let key_strings = keys.as_string::<i32>().clone();
+
+        // Create recursive builder for values
+        let values = map_array.values();
+        let values_builder =
+            make_arrow_to_variant_row_builder(values.data_type(), 
values.as_ref(), options)?;
+
+        Ok(Self {
+            map_array,
+            key_strings,
+            values_builder: Box::new(values_builder),
+        })
+    }
+
+    fn append_row(
+        &mut self,
+        builder: &mut impl VariantBuilderExt,
+        index: usize,
+    ) -> Result<(), ArrowError> {
+        // Check for NULL map first (via null bitmap)
+        if self.map_array.is_null(index) {
+            builder.append_null();
+            return Ok(());
+        }
+
+        let offsets = self.map_array.offsets();
+        let start = offsets[index].as_usize();
+        let end = offsets[index + 1].as_usize();
+
+        // Create object builder for this map
+        let mut object_builder = builder.try_new_object()?;
+
+        // Add each key-value pair (loop does nothing for empty maps - 
correct!)
+        for kv_index in start..end {
+            let key = self.key_strings.value(kv_index);
+            let mut field_builder = ObjectFieldBuilder::new(key, &mut 
object_builder);
+            self.values_builder
+                .append_row(&mut field_builder, kv_index)?;
+        }
+
+        object_builder.finish();
+        Ok(())
+    }
+}
+
+/// Union builder for both sparse and dense union arrays
+///
+/// NOTE: Union type ids are _not_ required to be dense, hence the hash map 
for child builders.
+pub(crate) struct UnionArrowToVariantBuilder<'a> {
+    union_array: &'a arrow::array::UnionArray,
+    child_builders: HashMap<i8, Box<ArrowToVariantRowBuilder<'a>>>,
+}
+
+impl<'a> UnionArrowToVariantBuilder<'a> {
+    pub(crate) fn new(array: &'a dyn Array, options: &'a CastOptions) -> 
Result<Self, ArrowError> {
+        let union_array = array.as_union();
+        let type_ids = union_array.type_ids();
+
+        // Create child builders for each union field
+        let mut child_builders = HashMap::new();
+        for &type_id in type_ids {
+            let child_array = union_array.child(type_id);
+            let child_builder = make_arrow_to_variant_row_builder(
+                child_array.data_type(),
+                child_array.as_ref(),
+                options,
+            )?;
+            child_builders.insert(type_id, Box::new(child_builder));
+        }
+
+        Ok(Self {
+            union_array,
+            child_builders,
+        })
+    }
+
+    fn append_row(
+        &mut self,
+        builder: &mut impl VariantBuilderExt,
+        index: usize,
+    ) -> Result<(), ArrowError> {
+        let type_id = self.union_array.type_id(index);
+        let value_offset = self.union_array.value_offset(index);
+
+        // Delegate to the appropriate child builder, or append null to handle 
an invalid type_id
+        match self.child_builders.get_mut(&type_id) {
+            Some(child_builder) => child_builder.append_row(builder, 
value_offset)?,
+            None => builder.append_null(),
+        }
+
+        Ok(())
+    }
+}
+
+/// Dictionary array builder with simple O(1) indexing
+pub(crate) struct DictionaryArrowToVariantBuilder<'a> {
+    keys: &'a dyn Array, // only needed for null checks
+    normalized_keys: Vec<usize>,
+    values_builder: Box<ArrowToVariantRowBuilder<'a>>,
+}
+
+impl<'a> DictionaryArrowToVariantBuilder<'a> {
+    pub(crate) fn new(array: &'a dyn Array, options: &'a CastOptions) -> 
Result<Self, ArrowError> {
+        let dict_array = array.as_any_dictionary();
+        let values = dict_array.values();
+        let values_builder =
+            make_arrow_to_variant_row_builder(values.data_type(), 
values.as_ref(), options)?;
+
+        // WARNING: normalized_keys panics if values is empty
+        let normalized_keys = match values.len() {
+            0 => Vec::new(),
+            _ => dict_array.normalized_keys(),
+        };
+
+        Ok(Self {
+            keys: dict_array.keys(),
+            normalized_keys,
+            values_builder: Box::new(values_builder),
+        })
+    }
+
+    fn append_row(
+        &mut self,
+        builder: &mut impl VariantBuilderExt,
+        index: usize,
+    ) -> Result<(), ArrowError> {
+        if self.keys.is_null(index) {
+            builder.append_null();
+        } else {
+            let normalized_key = self.normalized_keys[index];
+            self.values_builder.append_row(builder, normalized_key)?;
+        }
+        Ok(())
+    }
+}
+
+/// Run-end encoded array builder with efficient sequential access
+pub(crate) struct RunEndEncodedArrowToVariantBuilder<'a, R: RunEndIndexType> {
+    run_array: &'a arrow::array::RunArray<R>,
+    values_builder: Box<ArrowToVariantRowBuilder<'a>>,
+
+    run_ends: &'a [R::Native],
+    run_number: usize, // Physical index into run_ends and values
+    run_start: usize,  // Logical start index of current run
+}
+
+impl<'a, R: RunEndIndexType> RunEndEncodedArrowToVariantBuilder<'a, R> {
+    pub(crate) fn new(array: &'a dyn Array, options: &'a CastOptions) -> 
Result<Self, ArrowError> {
+        let Some(run_array) = array.as_run_opt() else {
+            return Err(ArrowError::CastError("Expected RunArray".to_string()));
+        };
+
+        let values = run_array.values();
+        let values_builder =
+            make_arrow_to_variant_row_builder(values.data_type(), 
values.as_ref(), options)?;
+
+        Ok(Self {
+            run_array,
+            values_builder: Box::new(values_builder),
+            run_ends: run_array.run_ends().values(),
+            run_number: 0,
+            run_start: 0,
+        })
+    }
+
+    fn set_run_for_index(&mut self, index: usize) -> Result<(), ArrowError> {
+        if index >= self.run_start {
+            let Some(run_end) = self.run_ends.get(self.run_number) else {
+                return Err(ArrowError::CastError(format!(
+                    "Index {index} beyond run array"
+                )));
+            };
+            if index < run_end.as_usize() {
+                return Ok(());
+            }
+            if index == run_end.as_usize() {
+                self.run_number += 1;
+                self.run_start = run_end.as_usize();
+                return Ok(());
+            }
+        }
+
+        // Use partition_point for all non-sequential cases
+        let run_number = self
+            .run_ends
+            .partition_point(|&run_end| run_end.as_usize() <= index);
+        if run_number >= self.run_ends.len() {
+            return Err(ArrowError::CastError(format!(
+                "Index {index} beyond run array"
+            )));
+        }
+        self.run_number = run_number;
+        self.run_start = match run_number {
+            0 => 0,
+            _ => self.run_ends[run_number - 1].as_usize(),
+        };
+        Ok(())
+    }
+
+    fn append_row(
+        &mut self,
+        builder: &mut impl VariantBuilderExt,
+        index: usize,
+    ) -> Result<(), ArrowError> {
+        self.set_run_for_index(index)?;
+
+        // Handle null values
+        if self.run_array.values().is_null(self.run_number) {
+            builder.append_null();
+            return Ok(());
+        }
+
+        // Re-encode the value
+        self.values_builder.append_row(builder, self.run_number)?;
+
+        Ok(())
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::{VariantArray, VariantArrayBuilder};
+    use arrow::array::{ArrayRef, BooleanArray, Int32Array, StringArray};
+    use std::sync::Arc;
+
+    /// Builds a VariantArray from an Arrow array using the row builder.
+    fn execute_row_builder_test(array: &dyn Array) -> VariantArray {
+        let options = CastOptions::default();
+        let mut row_builder =
+            make_arrow_to_variant_row_builder(array.data_type(), array, 
&options).unwrap();
+
+        let mut array_builder = VariantArrayBuilder::new(array.len());
+
+        // The repetitive loop that appears in every test
+        for i in 0..array.len() {
+            let mut variant_builder = array_builder.variant_builder();
+            row_builder.append_row(&mut variant_builder, i).unwrap();
+            variant_builder.finish();
+        }
+
+        let variant_array = array_builder.build();
+        assert_eq!(variant_array.len(), array.len());
+        variant_array
+    }
+
+    /// Generic helper function to test row builders with basic assertion 
patterns.
+    /// Uses execute_row_builder_test and adds simple value comparison 
assertions.
+    fn test_row_builder_basic(array: &dyn Array, expected_values: 
Vec<Option<Variant>>) {
+        let variant_array = execute_row_builder_test(array);
+
+        // The repetitive assertion pattern
+        for (i, expected) in expected_values.iter().enumerate() {
+            match expected {
+                Some(variant) => {
+                    assert_eq!(variant_array.value(i), *variant, "Mismatch at 
index {}", i)
+                }
+                None => assert!(variant_array.is_null(i), "Expected null at 
index {}", i),
+            }
+        }
+    }
+
+    #[test]
+    fn test_primitive_row_builder() {
+        let int_array = Int32Array::from(vec![Some(42), None, Some(100)]);
+        test_row_builder_basic(
+            &int_array,
+            vec![Some(Variant::Int32(42)), None, Some(Variant::Int32(100))],
+        );
+    }
+
+    #[test]
+    fn test_string_row_builder() {
+        let string_array = StringArray::from(vec![Some("hello"), None, 
Some("world")]);
+        test_row_builder_basic(
+            &string_array,
+            vec![
+                Some(Variant::from("hello")),
+                None,
+                Some(Variant::from("world")),
+            ],
+        );
+    }
+
+    #[test]
+    fn test_boolean_row_builder() {
+        let bool_array = BooleanArray::from(vec![Some(true), None, 
Some(false)]);
+        test_row_builder_basic(
+            &bool_array,
+            vec![Some(Variant::from(true)), None, Some(Variant::from(false))],
+        );
+    }
+
+    #[test]
+    fn test_struct_row_builder() {
+        use arrow::array::{ArrayRef, Int32Array, StringArray, StructArray};
+        use arrow_schema::{DataType, Field};
+        use std::sync::Arc;
+
+        // Create a struct array with int and string fields
+        let int_field = Field::new("id", DataType::Int32, true);
+        let string_field = Field::new("name", DataType::Utf8, true);
+
+        let int_array = Int32Array::from(vec![Some(1), None, Some(3)]);
+        let string_array = StringArray::from(vec![Some("Alice"), Some("Bob"), 
None]);
+
+        let struct_array = StructArray::try_new(
+            vec![int_field, string_field].into(),
+            vec![
+                Arc::new(int_array) as ArrayRef,
+                Arc::new(string_array) as ArrayRef,
+            ],
+            None,
+        )
+        .unwrap();
+
+        let variant_array = execute_row_builder_test(&struct_array);
+
+        // Check first row - should have both fields
+        let first_variant = variant_array.value(0);
+        assert_eq!(first_variant.get_object_field("id"), 
Some(Variant::from(1)));
+        assert_eq!(
+            first_variant.get_object_field("name"),
+            Some(Variant::from("Alice"))
+        );
+
+        // Check second row - should have name field but not id (null field 
omitted)
+        let second_variant = variant_array.value(1);
+        assert_eq!(second_variant.get_object_field("id"), None); // null field 
omitted
+        assert_eq!(
+            second_variant.get_object_field("name"),
+            Some(Variant::from("Bob"))
+        );
+
+        // Check third row - should have id field but not name (null field 
omitted)
+        let third_variant = variant_array.value(2);
+        assert_eq!(third_variant.get_object_field("id"), 
Some(Variant::from(3)));
+        assert_eq!(third_variant.get_object_field("name"), None); // null 
field omitted
+    }
+
+    #[test]
+    fn test_run_end_encoded_row_builder() {
+        use arrow::array::{Int32Array, RunArray};
+        use arrow::datatypes::Int32Type;
+
+        // Create a run-end encoded array: [A, A, B, B, B, C]
+        // run_ends: [2, 5, 6]
+        // values: ["A", "B", "C"]
+        let values = StringArray::from(vec!["A", "B", "C"]);
+        let run_ends = Int32Array::from(vec![2, 5, 6]);
+        let run_array = RunArray::<Int32Type>::try_new(&run_ends, 
&values).unwrap();
+
+        let variant_array = execute_row_builder_test(&run_array);
+
+        // Verify the values
+        assert_eq!(variant_array.value(0), Variant::from("A")); // Run 0
+        assert_eq!(variant_array.value(1), Variant::from("A")); // Run 0
+        assert_eq!(variant_array.value(2), Variant::from("B")); // Run 1
+        assert_eq!(variant_array.value(3), Variant::from("B")); // Run 1
+        assert_eq!(variant_array.value(4), Variant::from("B")); // Run 1
+        assert_eq!(variant_array.value(5), Variant::from("C")); // Run 2
+    }
+
+    #[test]
+    fn test_run_end_encoded_random_access() {
+        use arrow::array::{Int32Array, RunArray};
+        use arrow::datatypes::Int32Type;
+
+        // Create a run-end encoded array: [A, A, B, B, B, C]
+        let values = StringArray::from(vec!["A", "B", "C"]);
+        let run_ends = Int32Array::from(vec![2, 5, 6]);
+        let run_array = RunArray::<Int32Type>::try_new(&run_ends, 
&values).unwrap();
+
+        let options = CastOptions::default();
+        let mut row_builder =
+            make_arrow_to_variant_row_builder(run_array.data_type(), 
&run_array, &options).unwrap();
+
+        // Test random access pattern (backward jumps, forward jumps)
+        let access_pattern = [0, 5, 2, 4, 1, 3]; // Mix of all cases
+        let expected_values = ["A", "C", "B", "B", "A", "B"];
+
+        for (i, &index) in access_pattern.iter().enumerate() {
+            let mut array_builder = VariantArrayBuilder::new(1);
+            let mut variant_builder = array_builder.variant_builder();
+            row_builder.append_row(&mut variant_builder, index).unwrap();
+            variant_builder.finish();
+
+            let variant_array = array_builder.build();
+            assert_eq!(variant_array.value(0), 
Variant::from(expected_values[i]));
+        }
+    }
+
+    #[test]
+    fn test_run_end_encoded_with_nulls() {
+        use arrow::array::{Int32Array, RunArray};
+        use arrow::datatypes::Int32Type;
+
+        // Create a run-end encoded array with null values: [A, A, null, null, 
B]
+        let values = StringArray::from(vec![Some("A"), None, Some("B")]);
+        let run_ends = Int32Array::from(vec![2, 4, 5]);
+        let run_array = RunArray::<Int32Type>::try_new(&run_ends, 
&values).unwrap();
+
+        let options = CastOptions::default();
+        let mut row_builder =
+            make_arrow_to_variant_row_builder(run_array.data_type(), 
&run_array, &options).unwrap();
+        let mut array_builder = VariantArrayBuilder::new(5);
+
+        // Test sequential access
+        for i in 0..5 {
+            let mut variant_builder = array_builder.variant_builder();
+            row_builder.append_row(&mut variant_builder, i).unwrap();
+            variant_builder.finish();
+        }
+
+        let variant_array = array_builder.build();
+        assert_eq!(variant_array.len(), 5);
+
+        // Verify the values
+        assert_eq!(variant_array.value(0), Variant::from("A")); // Run 0
+        assert_eq!(variant_array.value(1), Variant::from("A")); // Run 0
+        assert!(variant_array.is_null(2)); // Run 1 (null)
+        assert!(variant_array.is_null(3)); // Run 1 (null)
+        assert_eq!(variant_array.value(4), Variant::from("B")); // Run 2
+    }
+
+    #[test]
+    fn test_dictionary_row_builder() {
+        use arrow::array::{DictionaryArray, Int32Array};
+        use arrow::datatypes::Int32Type;
+
+        // Create a dictionary array: keys=[0, 1, 0, 2, 1], values=["apple", 
"banana", "cherry"]
+        let values = StringArray::from(vec!["apple", "banana", "cherry"]);
+        let keys = Int32Array::from(vec![0, 1, 0, 2, 1]);
+        let dict_array = DictionaryArray::<Int32Type>::try_new(keys, 
Arc::new(values)).unwrap();
+
+        let variant_array = execute_row_builder_test(&dict_array);
+
+        // Verify the values match the dictionary lookup
+        assert_eq!(variant_array.value(0), Variant::from("apple")); // keys[0] 
= 0 -> values[0] = "apple"
+        assert_eq!(variant_array.value(1), Variant::from("banana")); // 
keys[1] = 1 -> values[1] = "banana"
+        assert_eq!(variant_array.value(2), Variant::from("apple")); // keys[2] 
= 0 -> values[0] = "apple"
+        assert_eq!(variant_array.value(3), Variant::from("cherry")); // 
keys[3] = 2 -> values[2] = "cherry"
+        assert_eq!(variant_array.value(4), Variant::from("banana")); // 
keys[4] = 1 -> values[1] = "banana"
+    }
+
+    #[test]
+    fn test_dictionary_with_nulls() {
+        use arrow::array::{DictionaryArray, Int32Array};
+        use arrow::datatypes::Int32Type;
+
+        // Create a dictionary array with null keys: keys=[0, null, 1, null, 
2], values=["x", "y", "z"]
+        let values = StringArray::from(vec!["x", "y", "z"]);
+        let keys = Int32Array::from(vec![Some(0), None, Some(1), None, 
Some(2)]);
+        let dict_array = DictionaryArray::<Int32Type>::try_new(keys, 
Arc::new(values)).unwrap();
+
+        let options = CastOptions::default();
+        let mut row_builder =
+            make_arrow_to_variant_row_builder(dict_array.data_type(), 
&dict_array, &options)
+                .unwrap();
+        let mut array_builder = VariantArrayBuilder::new(5);
+
+        // Test sequential access
+        for i in 0..5 {
+            let mut variant_builder = array_builder.variant_builder();
+            row_builder.append_row(&mut variant_builder, i).unwrap();
+            variant_builder.finish();
+        }
+
+        let variant_array = array_builder.build();
+        assert_eq!(variant_array.len(), 5);
+
+        // Verify the values and nulls
+        assert_eq!(variant_array.value(0), Variant::from("x")); // keys[0] = 0 
-> values[0] = "x"
+        assert!(variant_array.is_null(1)); // keys[1] = null
+        assert_eq!(variant_array.value(2), Variant::from("y")); // keys[2] = 1 
-> values[1] = "y"
+        assert!(variant_array.is_null(3)); // keys[3] = null
+        assert_eq!(variant_array.value(4), Variant::from("z")); // keys[4] = 2 
-> values[2] = "z"
+    }
+
+    #[test]
+    fn test_dictionary_random_access() {
+        use arrow::array::{DictionaryArray, Int32Array};
+        use arrow::datatypes::Int32Type;
+
+        // Create a dictionary array: keys=[0, 1, 2, 0, 1, 2], values=["red", 
"green", "blue"]
+        let values = StringArray::from(vec!["red", "green", "blue"]);
+        let keys = Int32Array::from(vec![0, 1, 2, 0, 1, 2]);
+        let dict_array = DictionaryArray::<Int32Type>::try_new(keys, 
Arc::new(values)).unwrap();
+
+        let options = CastOptions::default();
+        let mut row_builder =
+            make_arrow_to_variant_row_builder(dict_array.data_type(), 
&dict_array, &options)
+                .unwrap();
+
+        // Test random access pattern
+        let access_pattern = [5, 0, 3, 1, 4, 2]; // Random order
+        let expected_values = ["blue", "red", "red", "green", "green", "blue"];
+
+        for (i, &index) in access_pattern.iter().enumerate() {
+            let mut array_builder = VariantArrayBuilder::new(1);
+            let mut variant_builder = array_builder.variant_builder();
+            row_builder.append_row(&mut variant_builder, index).unwrap();
+            variant_builder.finish();
+
+            let variant_array = array_builder.build();
+            assert_eq!(variant_array.value(0), 
Variant::from(expected_values[i]));
+        }
+    }
+
+    #[test]
+    fn test_nested_dictionary() {
+        use arrow::array::{DictionaryArray, Int32Array, StructArray};
+        use arrow::datatypes::{Field, Int32Type};
+
+        // Create a dictionary with struct values
+        let id_array = Int32Array::from(vec![1, 2, 3]);
+        let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie"]);
+        let struct_array = StructArray::from(vec![
+            (
+                Arc::new(Field::new("id", DataType::Int32, false)),
+                Arc::new(id_array) as ArrayRef,
+            ),
+            (
+                Arc::new(Field::new("name", DataType::Utf8, false)),
+                Arc::new(name_array) as ArrayRef,
+            ),
+        ]);
+
+        let keys = Int32Array::from(vec![0, 1, 0, 2, 1]);
+        let dict_array =
+            DictionaryArray::<Int32Type>::try_new(keys, 
Arc::new(struct_array)).unwrap();
+
+        let options = CastOptions::default();
+        let mut row_builder =
+            make_arrow_to_variant_row_builder(dict_array.data_type(), 
&dict_array, &options)
+                .unwrap();
+        let mut array_builder = VariantArrayBuilder::new(5);
+
+        // Test sequential access
+        for i in 0..5 {
+            let mut variant_builder = array_builder.variant_builder();
+            row_builder.append_row(&mut variant_builder, i).unwrap();
+            variant_builder.finish();
+        }
+
+        let variant_array = array_builder.build();
+        assert_eq!(variant_array.len(), 5);
+
+        // Verify the nested struct values
+        let first_variant = variant_array.value(0);
+        assert_eq!(first_variant.get_object_field("id"), 
Some(Variant::from(1)));
+        assert_eq!(
+            first_variant.get_object_field("name"),
+            Some(Variant::from("Alice"))
+        );
+
+        let second_variant = variant_array.value(1);
+        assert_eq!(
+            second_variant.get_object_field("id"),
+            Some(Variant::from(2))
+        );
+        assert_eq!(
+            second_variant.get_object_field("name"),
+            Some(Variant::from("Bob"))
+        );
+
+        // Test that repeated keys give same values
+        let third_variant = variant_array.value(2);
+        assert_eq!(third_variant.get_object_field("id"), 
Some(Variant::from(1)));
+        assert_eq!(
+            third_variant.get_object_field("name"),
+            Some(Variant::from("Alice"))
+        );
+    }
+
+    #[test]
+    fn test_list_row_builder() {
+        use arrow::array::ListArray;
+
+        // Create a list array: [[1, 2], [3, 4, 5], null, []]
+        let data = vec![
+            Some(vec![Some(1), Some(2)]),
+            Some(vec![Some(3), Some(4), Some(5)]),
+            None,
+            Some(vec![]),
+        ];
+        let list_array = ListArray::from_iter_primitive::<Int32Type, _, 
_>(data);
+
+        let variant_array = execute_row_builder_test(&list_array);
+
+        // Row 0: [1, 2]
+        let row0 = variant_array.value(0);
+        let list0 = row0.as_list().unwrap();
+        assert_eq!(list0.len(), 2);
+        assert_eq!(list0.get(0), Some(Variant::from(1)));
+        assert_eq!(list0.get(1), Some(Variant::from(2)));
+
+        // Row 1: [3, 4, 5]
+        let row1 = variant_array.value(1);
+        let list1 = row1.as_list().unwrap();
+        assert_eq!(list1.len(), 3);
+        assert_eq!(list1.get(0), Some(Variant::from(3)));
+        assert_eq!(list1.get(1), Some(Variant::from(4)));
+        assert_eq!(list1.get(2), Some(Variant::from(5)));
+
+        // Row 2: null
+        assert!(variant_array.is_null(2));
+
+        // Row 3: []
+        let row3 = variant_array.value(3);
+        let list3 = row3.as_list().unwrap();
+        assert_eq!(list3.len(), 0);
+    }
+
+    #[test]
+    fn test_sliced_list_row_builder() {
+        use arrow::array::ListArray;
+
+        // Create a list array: [[1, 2], [3, 4, 5], [6]]
+        let data = vec![
+            Some(vec![Some(1), Some(2)]),
+            Some(vec![Some(3), Some(4), Some(5)]),
+            Some(vec![Some(6)]),
+        ];
+        let list_array = ListArray::from_iter_primitive::<Int32Type, _, 
_>(data);
+
+        // Slice to get just the middle element: [[3, 4, 5]]
+        let sliced_array = list_array.slice(1, 1);
+
+        let options = CastOptions::default();
+        let mut row_builder =
+            make_arrow_to_variant_row_builder(sliced_array.data_type(), 
&sliced_array, &options)
+                .unwrap();
+        let mut variant_array_builder = 
VariantArrayBuilder::new(sliced_array.len());
+
+        // Test the single row
+        let mut builder = variant_array_builder.variant_builder();
+        row_builder.append_row(&mut builder, 0).unwrap();
+        builder.finish();
+
+        let variant_array = variant_array_builder.build();
+
+        // Verify result
+        assert_eq!(variant_array.len(), 1);
+
+        // Row 0: [3, 4, 5]
+        let row0 = variant_array.value(0);
+        let list0 = row0.as_list().unwrap();
+        assert_eq!(list0.len(), 3);
+        assert_eq!(list0.get(0), Some(Variant::from(3)));
+        assert_eq!(list0.get(1), Some(Variant::from(4)));
+        assert_eq!(list0.get(2), Some(Variant::from(5)));
+    }
+
+    #[test]
+    fn test_nested_list_row_builder() {
+        use arrow::array::ListArray;
+        use arrow::datatypes::Field;
+
+        // Build the nested structure manually
+        let inner_field = Arc::new(Field::new("item", DataType::Int32, true));
+        let inner_list_field = Arc::new(Field::new("item", 
DataType::List(inner_field), true));
+
+        let values_data = vec![Some(vec![Some(1), Some(2)]), 
Some(vec![Some(3)])];
+        let values_list = ListArray::from_iter_primitive::<Int32Type, _, 
_>(values_data);
+
+        let outer_offsets = arrow::buffer::OffsetBuffer::new(vec![0i32, 2, 
2].into());
+        let outer_list = ListArray::new(
+            inner_list_field,
+            outer_offsets,
+            Arc::new(values_list),
+            Some(arrow::buffer::NullBuffer::from(vec![true, false])),
+        );
+
+        let options = CastOptions::default();
+        let mut row_builder =
+            make_arrow_to_variant_row_builder(outer_list.data_type(), 
&outer_list, &options)
+                .unwrap();
+        let mut variant_array_builder = 
VariantArrayBuilder::new(outer_list.len());
+
+        for i in 0..outer_list.len() {
+            let mut builder = variant_array_builder.variant_builder();
+            row_builder.append_row(&mut builder, i).unwrap();
+            builder.finish();
+        }
+
+        let variant_array = variant_array_builder.build();
+
+        // Verify results
+        assert_eq!(variant_array.len(), 2);
+
+        // Row 0: [[1, 2], [3]]
+        let row0 = variant_array.value(0);
+        let outer_list0 = row0.as_list().unwrap();
+        assert_eq!(outer_list0.len(), 2);
+
+        let inner_list0_0 = outer_list0.get(0).unwrap();
+        let inner_list0_0 = inner_list0_0.as_list().unwrap();
+        assert_eq!(inner_list0_0.len(), 2);
+        assert_eq!(inner_list0_0.get(0), Some(Variant::from(1)));
+        assert_eq!(inner_list0_0.get(1), Some(Variant::from(2)));
+
+        let inner_list0_1 = outer_list0.get(1).unwrap();
+        let inner_list0_1 = inner_list0_1.as_list().unwrap();
+        assert_eq!(inner_list0_1.len(), 1);
+        assert_eq!(inner_list0_1.get(0), Some(Variant::from(3)));
+
+        // Row 1: null
+        assert!(variant_array.is_null(1));
+    }
+
+    #[test]
+    fn test_map_row_builder() {
+        use arrow::array::{Int32Array, MapArray, StringArray, StructArray};
+        use arrow::buffer::{NullBuffer, OffsetBuffer};
+        use arrow::datatypes::{DataType, Field, Fields};
+        use std::sync::Arc;
+
+        // Create the entries struct array (key-value pairs)
+        let keys = StringArray::from(vec!["key1", "key2", "key3"]);
+        let values = Int32Array::from(vec![1, 2, 3]);
+        let entries_fields = Fields::from(vec![
+            Field::new("key", DataType::Utf8, false),
+            Field::new("value", DataType::Int32, true),
+        ]);
+        let entries = StructArray::new(
+            entries_fields.clone(),
+            vec![Arc::new(keys), Arc::new(values)],
+            None, // No nulls in the entries themselves
+        );
+
+        // Create offsets for 4 maps: [0..1], [1..1], [1..1], [1..3]
+        // Map 0: {"key1": 1}    (1 entry)
+        // Map 1: {}             (0 entries - empty)
+        // Map 2: null           (0 entries but NULL via null buffer)
+        // Map 3: {"key2": 2, "key3": 3}  (2 entries)
+        let offsets = OffsetBuffer::new(vec![0, 1, 1, 1, 3].into());
+
+        // Create null buffer - map at index 2 is NULL
+        let null_buffer = Some(NullBuffer::from(vec![true, true, false, 
true]));
+
+        // Create the map field
+        let map_field = Arc::new(Field::new(
+            "entries",
+            DataType::Struct(entries_fields),
+            false, // Keys are non-nullable
+        ));
+
+        // Create MapArray using try_new
+        let map_array = MapArray::try_new(
+            map_field,
+            offsets,
+            entries,
+            null_buffer,
+            false, // not ordered
+        )
+        .unwrap();
+
+        let variant_array = execute_row_builder_test(&map_array);
+
+        // Map 0: {"key1": 1}
+        let map0 = variant_array.value(0);
+        let obj0 = map0.as_object().unwrap();
+        assert_eq!(obj0.len(), 1);
+        assert_eq!(obj0.get("key1"), Some(Variant::from(1)));
+
+        // Map 1: {} (empty object, not null)
+        let map1 = variant_array.value(1);
+        let obj1 = map1.as_object().unwrap();
+        assert_eq!(obj1.len(), 0); // Empty object
+
+        // Map 2: null (actual NULL)
+        assert!(variant_array.is_null(2));
+
+        // Map 3: {"key2": 2, "key3": 3}
+        let map3 = variant_array.value(3);
+        let obj3 = map3.as_object().unwrap();
+        assert_eq!(obj3.len(), 2);
+        assert_eq!(obj3.get("key2"), Some(Variant::from(2)));
+        assert_eq!(obj3.get("key3"), Some(Variant::from(3)));
+    }
+
+    #[test]
+    fn test_union_sparse_row_builder() {
+        use arrow::array::{Float64Array, Int32Array, StringArray, UnionArray};
+        use arrow::buffer::ScalarBuffer;
+        use arrow::datatypes::{DataType, Field, UnionFields};
+        use std::sync::Arc;
+
+        // Create a sparse union array with mixed types (int, float, string)
+        let int_array = Int32Array::from(vec![Some(1), None, None, None, 
Some(34), None]);
+        let float_array = Float64Array::from(vec![None, Some(3.2), None, 
Some(32.5), None, None]);
+        let string_array = StringArray::from(vec![None, None, Some("hello"), 
None, None, None]);
+        let type_ids = [0, 1, 2, 1, 0, 
0].into_iter().collect::<ScalarBuffer<i8>>();
+
+        let union_fields = UnionFields::new(
+            vec![0, 1, 2],
+            vec![
+                Field::new("int_field", DataType::Int32, false),
+                Field::new("float_field", DataType::Float64, false),
+                Field::new("string_field", DataType::Utf8, false),
+            ],
+        );
+
+        let children: Vec<Arc<dyn Array>> = vec![
+            Arc::new(int_array),
+            Arc::new(float_array),
+            Arc::new(string_array),
+        ];
+
+        let union_array = UnionArray::try_new(
+            union_fields,
+            type_ids,
+            None, // Sparse union
+            children,
+        )
+        .unwrap();
+
+        let variant_array = execute_row_builder_test(&union_array);
+        assert_eq!(variant_array.value(0), Variant::Int32(1));
+        assert_eq!(variant_array.value(1), Variant::Double(3.2));
+        assert_eq!(variant_array.value(2), Variant::from("hello"));
+        assert_eq!(variant_array.value(3), Variant::Double(32.5));
+        assert_eq!(variant_array.value(4), Variant::Int32(34));
+        assert!(variant_array.is_null(5));
+    }
+
+    #[test]
+    fn test_union_dense_row_builder() {
+        use arrow::array::{Float64Array, Int32Array, StringArray, UnionArray};
+        use arrow::buffer::ScalarBuffer;
+        use arrow::datatypes::{DataType, Field, UnionFields};
+        use std::sync::Arc;
+
+        // Create a dense union array with mixed types (int, float, string)
+        let int_array = Int32Array::from(vec![Some(1), Some(34), None]);
+        let float_array = Float64Array::from(vec![3.2, 32.5]);
+        let string_array = StringArray::from(vec!["hello"]);
+        let type_ids = [0, 1, 2, 1, 0, 
0].into_iter().collect::<ScalarBuffer<i8>>();
+        let offsets = [0, 0, 0, 1, 1, 2]
+            .into_iter()
+            .collect::<ScalarBuffer<i32>>();
+
+        let union_fields = UnionFields::new(
+            vec![0, 1, 2],
+            vec![
+                Field::new("int_field", DataType::Int32, false),
+                Field::new("float_field", DataType::Float64, false),
+                Field::new("string_field", DataType::Utf8, false),
+            ],
+        );
+
+        let children: Vec<Arc<dyn Array>> = vec![
+            Arc::new(int_array),
+            Arc::new(float_array),
+            Arc::new(string_array),
+        ];
+
+        let union_array = UnionArray::try_new(
+            union_fields,
+            type_ids,
+            Some(offsets), // Dense union
+            children,
+        )
+        .unwrap();
+
+        // Test the row builder
+        let options = CastOptions::default();
+        let mut row_builder =
+            make_arrow_to_variant_row_builder(union_array.data_type(), 
&union_array, &options)
+                .unwrap();
+
+        let mut variant_builder = VariantArrayBuilder::new(union_array.len());
+        for i in 0..union_array.len() {
+            let mut builder = variant_builder.variant_builder();
+            row_builder.append_row(&mut builder, i).unwrap();
+            builder.finish();
+        }
+        let variant_array = variant_builder.build();
+
+        assert_eq!(variant_array.len(), 6);
+        assert_eq!(variant_array.value(0), Variant::Int32(1));
+        assert_eq!(variant_array.value(1), Variant::Double(3.2));
+        assert_eq!(variant_array.value(2), Variant::from("hello"));
+        assert_eq!(variant_array.value(3), Variant::Double(32.5));
+        assert_eq!(variant_array.value(4), Variant::Int32(34));
+        assert!(variant_array.is_null(5));
+    }
+
+    #[test]
+    fn test_union_sparse_type_ids_row_builder() {
+        use arrow::array::{Int32Array, StringArray, UnionArray};
+        use arrow::buffer::ScalarBuffer;
+        use arrow::datatypes::{DataType, Field, UnionFields};
+        use std::sync::Arc;
+
+        // Create a sparse union with non-contiguous type IDs (1, 3)
+        let int_array = Int32Array::from(vec![Some(42), None]);
+        let string_array = StringArray::from(vec![None, Some("test")]);
+        let type_ids = [1, 3].into_iter().collect::<ScalarBuffer<i8>>();
+
+        let union_fields = UnionFields::new(
+            vec![1, 3], // Non-contiguous type IDs
+            vec![
+                Field::new("int_field", DataType::Int32, false),
+                Field::new("string_field", DataType::Utf8, false),
+            ],
+        );
+
+        let children: Vec<Arc<dyn Array>> = vec![Arc::new(int_array), 
Arc::new(string_array)];
+
+        let union_array = UnionArray::try_new(
+            union_fields,
+            type_ids,
+            None, // Sparse union
+            children,
+        )
+        .unwrap();
+
+        // Test the row builder
+        let options = CastOptions::default();
+        let mut row_builder =
+            make_arrow_to_variant_row_builder(union_array.data_type(), 
&union_array, &options)
+                .unwrap();
+
+        let mut variant_builder = VariantArrayBuilder::new(union_array.len());
+        for i in 0..union_array.len() {
+            let mut builder = variant_builder.variant_builder();
+            row_builder.append_row(&mut builder, i).unwrap();
+            builder.finish();
+        }
+        let variant_array = variant_builder.build();
+
+        // Verify results
+        assert_eq!(variant_array.len(), 2);
+
+        // Row 0: int 42 (type_id = 1)
+        assert_eq!(variant_array.value(0), Variant::Int32(42));
+
+        // Row 1: string "test" (type_id = 3)
+        assert_eq!(variant_array.value(1), Variant::from("test"));
+    }
+
+    #[test]
+    fn test_decimal32_row_builder() {
+        use arrow::array::Decimal32Array;
+        use parquet_variant::VariantDecimal4;
+
+        // Test Decimal32Array with scale 2 (e.g., for currency: 12.34)
+        let decimal_array = Decimal32Array::from(vec![Some(1234), None, 
Some(-5678)])
+            .with_precision_and_scale(9, 2)
+            .unwrap();
+
+        test_row_builder_basic(
+            &decimal_array,
+            vec![
+                Some(Variant::from(VariantDecimal4::try_new(1234, 
2).unwrap())),
+                None,
+                Some(Variant::from(VariantDecimal4::try_new(-5678, 
2).unwrap())),
+            ],
+        );
+    }
+
+    #[test]
+    fn test_decimal128_row_builder() {
+        use arrow::array::Decimal128Array;
+        use parquet_variant::VariantDecimal16;
+
+        // Test Decimal128Array with negative scale (multiply by 10^|scale|)
+        let decimal_array = Decimal128Array::from(vec![Some(123), None, 
Some(456)])
+            .with_precision_and_scale(10, -2)
+            .unwrap();
+
+        test_row_builder_basic(
+            &decimal_array,
+            vec![
+                Some(Variant::from(VariantDecimal16::try_new(12300, 
0).unwrap())),
+                None,
+                Some(Variant::from(VariantDecimal16::try_new(45600, 
0).unwrap())),
+            ],
+        );
+    }
+
+    #[test]
+    fn test_decimal256_overflow_row_builder() {
+        use arrow::array::Decimal256Array;
+        use arrow::datatypes::i256;
+
+        // Test Decimal256Array with a value that overflows i128
+        let large_value = i256::from_i128(i128::MAX) + i256::from(1); // 
Overflows i128
+        let decimal_array = Decimal256Array::from(vec![Some(large_value), 
Some(i256::from(123))])
+            .with_precision_and_scale(76, 3)
+            .unwrap();
+
+        test_row_builder_basic(
+            &decimal_array,
+            vec![
+                Some(Variant::Null), // Overflow value becomes Null
+                Some(Variant::from(VariantDecimal16::try_new(123, 
3).unwrap())),
+            ],
+        );
+    }
+
+    #[test]
+    fn test_binary_row_builder() {
+        use arrow::array::BinaryArray;
+
+        let binary_data = vec![
+            Some(b"hello".as_slice()),
+            None,
+            Some(b"\x00\x01\x02\xFF".as_slice()),
+            Some(b"".as_slice()), // Empty binary
+        ];
+        let binary_array = BinaryArray::from(binary_data);
+
+        test_row_builder_basic(
+            &binary_array,
+            vec![
+                Some(Variant::from(b"hello".as_slice())),
+                None,
+                Some(Variant::from([0x00, 0x01, 0x02, 0xFF].as_slice())),
+                Some(Variant::from([].as_slice())),
+            ],
+        );
+    }
+
+    #[test]
+    fn test_binary_view_row_builder() {
+        use arrow::array::BinaryViewArray;
+
+        let binary_data = vec![
+            Some(b"short".as_slice()),
+            None,
+            Some(b"this is a longer binary view that exceeds inline 
storage".as_slice()),
+        ];
+        let binary_view_array = BinaryViewArray::from(binary_data);
+
+        test_row_builder_basic(
+            &binary_view_array,
+            vec![
+                Some(Variant::from(b"short".as_slice())),
+                None,
+                Some(Variant::from(
+                    b"this is a longer binary view that exceeds inline 
storage".as_slice(),
+                )),
+            ],
+        );
+    }
+
+    #[test]
+    fn test_fixed_size_binary_row_builder() {
+        use arrow::array::FixedSizeBinaryArray;
+
+        let binary_data = vec![
+            Some([0x01, 0x02, 0x03, 0x04]),
+            None,
+            Some([0xFF, 0xFE, 0xFD, 0xFC]),
+        ];
+        let fixed_binary_array =
+            
FixedSizeBinaryArray::try_from_sparse_iter_with_size(binary_data.into_iter(), 4)
+                .unwrap();
+
+        test_row_builder_basic(
+            &fixed_binary_array,
+            vec![
+                Some(Variant::from([0x01, 0x02, 0x03, 0x04].as_slice())),
+                None,
+                Some(Variant::from([0xFF, 0xFE, 0xFD, 0xFC].as_slice())),
+            ],
+        );
+    }
+
+    #[test]
+    fn test_utf8_view_row_builder() {
+        use arrow::array::StringViewArray;
+
+        let string_data = vec![
+            Some("short"),
+            None,
+            Some("this is a much longer string that will be stored out-of-line 
in the buffer"),
+        ];
+        let string_view_array = StringViewArray::from(string_data);
+
+        test_row_builder_basic(
+            &string_view_array,
+            vec![
+                Some(Variant::from("short")),
+                None,
+                Some(Variant::from(
+                    "this is a much longer string that will be stored 
out-of-line in the buffer",
+                )),
+            ],
+        );
+    }
+
+    #[test]
+    fn test_timestamp_second_row_builder() {
+        use arrow::array::TimestampSecondArray;
+
+        let timestamp_data = vec![
+            Some(1609459200), // 2021-01-01 00:00:00 UTC
+            None,
+            Some(1640995200), // 2022-01-01 00:00:00 UTC
+        ];
+        let timestamp_array = TimestampSecondArray::from(timestamp_data);
+
+        let expected_naive1 = DateTime::from_timestamp(1609459200, 
0).unwrap().naive_utc();
+        let expected_naive2 = DateTime::from_timestamp(1640995200, 
0).unwrap().naive_utc();
+
+        test_row_builder_basic(
+            &timestamp_array,
+            vec![
+                Some(Variant::from(expected_naive1)),
+                None,
+                Some(Variant::from(expected_naive2)),
+            ],
+        );
+    }
+
+    #[test]
+    fn test_timestamp_with_timezone_row_builder() {
+        use arrow::array::TimestampMicrosecondArray;
+        use chrono::DateTime;
+
+        let timestamp_data = vec![
+            Some(1609459200000000), // 2021-01-01 00:00:00 UTC (in 
microseconds)
+            None,
+            Some(1640995200000000), // 2022-01-01 00:00:00 UTC (in 
microseconds)
+        ];
+        let timezone = "UTC".to_string();
+        let timestamp_array =
+            
TimestampMicrosecondArray::from(timestamp_data).with_timezone(timezone);
+
+        let expected_utc1 = DateTime::from_timestamp(1609459200, 0).unwrap();
+        let expected_utc2 = DateTime::from_timestamp(1640995200, 0).unwrap();
+
+        test_row_builder_basic(
+            &timestamp_array,
+            vec![
+                Some(Variant::from(expected_utc1)),
+                None,
+                Some(Variant::from(expected_utc2)),
+            ],
+        );
+    }
+
+    #[test]
+    fn test_timestamp_nanosecond_precision_row_builder() {
+        use arrow::array::TimestampNanosecondArray;
+
+        let timestamp_data = vec![
+            Some(1609459200123456789), // 2021-01-01 00:00:00.123456789 UTC
+            None,
+            Some(1609459200000000000), // 2021-01-01 00:00:00.000000000 UTC 
(no fractional seconds)
+        ];
+        let timestamp_array = TimestampNanosecondArray::from(timestamp_data);
+
+        let expected_with_nanos = DateTime::from_timestamp(1609459200, 
123456789)
+            .unwrap()
+            .naive_utc();
+        let expected_no_nanos = DateTime::from_timestamp(1609459200, 
0).unwrap().naive_utc();
+
+        test_row_builder_basic(
+            &timestamp_array,
+            vec![
+                Some(Variant::from(expected_with_nanos)),
+                None,
+                Some(Variant::from(expected_no_nanos)),
+            ],
+        );
+    }
+
+    #[test]
+    fn test_timestamp_millisecond_row_builder() {
+        use arrow::array::TimestampMillisecondArray;
+
+        let timestamp_data = vec![
+            Some(1609459200123), // 2021-01-01 00:00:00.123 UTC
+            None,
+            Some(1609459200000), // 2021-01-01 00:00:00.000 UTC
+        ];
+        let timestamp_array = TimestampMillisecondArray::from(timestamp_data);
+
+        let expected_with_millis = DateTime::from_timestamp(1609459200, 
123000000)
+            .unwrap()
+            .naive_utc();
+        let expected_no_millis = DateTime::from_timestamp(1609459200, 
0).unwrap().naive_utc();
+
+        test_row_builder_basic(
+            &timestamp_array,
+            vec![
+                Some(Variant::from(expected_with_millis)),
+                None,
+                Some(Variant::from(expected_no_millis)),
+            ],
+        );
+    }
+
+    #[test]
+    fn test_date32_row_builder() {
+        use arrow::array::Date32Array;
+        use chrono::NaiveDate;
+
+        let date_data = vec![
+            Some(0), // 1970-01-01
+            None,
+            Some(19723),   // 2024-01-01 (days since epoch)
+            Some(-719162), // 0001-01-01 (near minimum)
+        ];
+        let date_array = Date32Array::from(date_data);
+
+        let expected_epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
+        let expected_2024 = NaiveDate::from_ymd_opt(2024, 1, 1).unwrap();
+        let expected_min = NaiveDate::from_ymd_opt(1, 1, 1).unwrap();
+
+        test_row_builder_basic(
+            &date_array,
+            vec![
+                Some(Variant::from(expected_epoch)),
+                None,
+                Some(Variant::from(expected_2024)),
+                Some(Variant::from(expected_min)),
+            ],
+        );
+    }
+
+    #[test]
+    fn test_date64_row_builder() {
+        use arrow::array::Date64Array;
+        use chrono::NaiveDate;
+
+        // Test Date64Array with various dates (milliseconds since epoch)
+        let date_data = vec![
+            Some(0), // 1970-01-01
+            None,
+            Some(1704067200000), // 2024-01-01 (milliseconds since epoch)
+            Some(86400000),      // 1970-01-02
+        ];
+        let date_array = Date64Array::from(date_data);
+
+        let expected_epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
+        let expected_2024 = NaiveDate::from_ymd_opt(2024, 1, 1).unwrap();
+        let expected_next_day = NaiveDate::from_ymd_opt(1970, 1, 2).unwrap();
+
+        test_row_builder_basic(
+            &date_array,
+            vec![
+                Some(Variant::from(expected_epoch)),
+                None,
+                Some(Variant::from(expected_2024)),
+                Some(Variant::from(expected_next_day)),
+            ],
+        );
+    }
+
+    #[test]
+    fn test_time32_second_row_builder() {
+        use arrow::array::Time32SecondArray;
+        use chrono::NaiveTime;
+
+        // Test Time32SecondArray with various times (seconds since midnight)
+        let time_data = vec![
+            Some(0), // 00:00:00
+            None,
+            Some(3661),  // 01:01:01
+            Some(86399), // 23:59:59
+        ];
+        let time_array = Time32SecondArray::from(time_data);
+
+        let expected_midnight = NaiveTime::from_hms_opt(0, 0, 0).unwrap();
+        let expected_time = NaiveTime::from_hms_opt(1, 1, 1).unwrap();
+        let expected_last = NaiveTime::from_hms_opt(23, 59, 59).unwrap();
+
+        test_row_builder_basic(
+            &time_array,
+            vec![
+                Some(Variant::from(expected_midnight)),
+                None,
+                Some(Variant::from(expected_time)),
+                Some(Variant::from(expected_last)),
+            ],
+        );
+    }
+
+    #[test]
+    fn test_time32_millisecond_row_builder() {
+        use arrow::array::Time32MillisecondArray;
+        use chrono::NaiveTime;
+
+        // Test Time32MillisecondArray with various times (milliseconds since 
midnight)
+        let time_data = vec![
+            Some(0), // 00:00:00.000
+            None,
+            Some(3661123),  // 01:01:01.123
+            Some(86399999), // 23:59:59.999
+        ];
+        let time_array = Time32MillisecondArray::from(time_data);
+
+        let expected_midnight = NaiveTime::from_hms_milli_opt(0, 0, 0, 
0).unwrap();
+        let expected_time = NaiveTime::from_hms_milli_opt(1, 1, 1, 
123).unwrap();
+        let expected_last = NaiveTime::from_hms_milli_opt(23, 59, 59, 
999).unwrap();
+
+        test_row_builder_basic(
+            &time_array,
+            vec![
+                Some(Variant::from(expected_midnight)),
+                None,
+                Some(Variant::from(expected_time)),
+                Some(Variant::from(expected_last)),
+            ],
+        );
+    }
+
+    #[test]
+    fn test_time64_microsecond_row_builder() {
+        use arrow::array::Time64MicrosecondArray;
+        use chrono::NaiveTime;
+
+        // Test Time64MicrosecondArray with various times (microseconds since 
midnight)
+        let time_data = vec![
+            Some(0), // 00:00:00.000000
+            None,
+            Some(3661123456),  // 01:01:01.123456
+            Some(86399999999), // 23:59:59.999999
+        ];
+        let time_array = Time64MicrosecondArray::from(time_data);
+
+        let expected_midnight = NaiveTime::from_hms_micro_opt(0, 0, 0, 
0).unwrap();
+        let expected_time = NaiveTime::from_hms_micro_opt(1, 1, 1, 
123456).unwrap();
+        let expected_last = NaiveTime::from_hms_micro_opt(23, 59, 59, 
999999).unwrap();
+
+        test_row_builder_basic(
+            &time_array,
+            vec![
+                Some(Variant::from(expected_midnight)),
+                None,
+                Some(Variant::from(expected_time)),
+                Some(Variant::from(expected_last)),
+            ],
+        );
+    }
+
+    #[test]
+    fn test_time64_nanosecond_row_builder() {
+        use arrow::array::Time64NanosecondArray;
+        use chrono::NaiveTime;
+
+        // Test Time64NanosecondArray with various times (nanoseconds since 
midnight)
+        let time_data = vec![
+            Some(0), // 00:00:00.000000000
+            None,
+            Some(3661123456789),  // 01:01:01.123456789
+            Some(86399999999999), // 23:59:59.999999999
+        ];
+        let time_array = Time64NanosecondArray::from(time_data);
+
+        let expected_midnight = NaiveTime::from_hms_nano_opt(0, 0, 0, 
0).unwrap();
+        // Nanoseconds are truncated to microsecond precision in Variant
+        let expected_time = NaiveTime::from_hms_micro_opt(1, 1, 1, 
123456).unwrap();
+        let expected_last = NaiveTime::from_hms_micro_opt(23, 59, 59, 
999999).unwrap();
+
+        test_row_builder_basic(
+            &time_array,
+            vec![
+                Some(Variant::from(expected_midnight)),
+                None,
+                Some(Variant::from(expected_time)),
+                Some(Variant::from(expected_last)),
+            ],
+        );
+    }
+}
diff --git a/parquet-variant-compute/src/cast_to_variant.rs 
b/parquet-variant-compute/src/cast_to_variant.rs
index 231d36f96e..3499470f59 100644
--- a/parquet-variant-compute/src/cast_to_variant.rs
+++ b/parquet-variant-compute/src/cast_to_variant.rs
@@ -15,131 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::collections::HashMap;
-use std::sync::Arc;
-
-use crate::type_conversion::{
-    decimal_to_variant_decimal, generic_conversion_array, 
non_generic_conversion_array,
-    primitive_conversion_array, timestamp_to_variant_timestamp,
-};
-use crate::{VariantArray, VariantArrayBuilder};
-use arrow::array::{
-    Array, AsArray, OffsetSizeTrait, TimestampMicrosecondArray, 
TimestampMillisecondArray,
-    TimestampNanosecondArray, TimestampSecondArray,
-};
-use arrow::buffer::{OffsetBuffer, ScalarBuffer};
-use arrow::compute::kernels::cast;
-use arrow::datatypes::{
-    i256, ArrowNativeType, BinaryType, BinaryViewType, Date32Type, Date64Type, 
Decimal128Type,
-    Decimal256Type, Decimal32Type, Decimal64Type, Float16Type, Float32Type, 
Float64Type, Int16Type,
-    Int32Type, Int64Type, Int8Type, LargeBinaryType, RunEndIndexType, 
Time32MillisecondType,
-    Time32SecondType, Time64MicrosecondType, Time64NanosecondType, UInt16Type, 
UInt32Type,
-    UInt64Type, UInt8Type,
-};
-use arrow::temporal_conversions::{
-    timestamp_ms_to_datetime, timestamp_ns_to_datetime, 
timestamp_s_to_datetime,
-    timestamp_us_to_datetime,
-};
-use arrow_schema::{ArrowError, DataType, FieldRef, TimeUnit, UnionFields};
-use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc};
-use parquet_variant::{
-    Variant, VariantBuilder, VariantDecimal16, VariantDecimal4, 
VariantDecimal8,
-};
-
-/// Options for controlling the behavior of `cast_to_variant_with_options`.
-#[derive(Debug, Clone, PartialEq, Eq)]
-pub struct CastOptions {
-    /// If true, return error on conversion failure. If false, insert null for 
failed conversions.
-    pub strict: bool,
-}
-
-impl Default for CastOptions {
-    fn default() -> Self {
-        Self { strict: true }
-    }
-}
-
-fn convert_timestamp_with_options(
-    time_unit: &TimeUnit,
-    time_zone: &Option<Arc<str>>,
-    input: &dyn Array,
-    builder: &mut VariantArrayBuilder,
-    options: &CastOptions,
-) -> Result<(), ArrowError> {
-    let native_datetimes: Vec<Option<NaiveDateTime>> = match time_unit {
-        arrow_schema::TimeUnit::Second => {
-            let ts_array = input
-                .as_any()
-                .downcast_ref::<TimestampSecondArray>()
-                .expect("Array is not TimestampSecondArray");
-            timestamp_to_variant_timestamp!(
-                ts_array,
-                timestamp_s_to_datetime,
-                "seconds",
-                options.strict
-            )
-        }
-        arrow_schema::TimeUnit::Millisecond => {
-            let ts_array = input
-                .as_any()
-                .downcast_ref::<TimestampMillisecondArray>()
-                .expect("Array is not TimestampMillisecondArray");
-            timestamp_to_variant_timestamp!(
-                ts_array,
-                timestamp_ms_to_datetime,
-                "milliseconds",
-                options.strict
-            )
-        }
-        arrow_schema::TimeUnit::Microsecond => {
-            let ts_array = input
-                .as_any()
-                .downcast_ref::<TimestampMicrosecondArray>()
-                .expect("Array is not TimestampMicrosecondArray");
-            timestamp_to_variant_timestamp!(
-                ts_array,
-                timestamp_us_to_datetime,
-                "microseconds",
-                options.strict
-            )
-        }
-        arrow_schema::TimeUnit::Nanosecond => {
-            let ts_array = input
-                .as_any()
-                .downcast_ref::<TimestampNanosecondArray>()
-                .expect("Array is not TimestampNanosecondArray");
-            timestamp_to_variant_timestamp!(
-                ts_array,
-                timestamp_ns_to_datetime,
-                "nanoseconds",
-                options.strict
-            )
-        }
-    };
-
-    for (i, x) in native_datetimes.iter().enumerate() {
-        match x {
-            Some(ndt) => {
-                if time_zone.is_none() {
-                    builder.append_variant((*ndt).into());
-                } else {
-                    let utc_dt: DateTime<Utc> = Utc.from_utc_datetime(ndt);
-                    builder.append_variant(utc_dt.into());
-                }
-            }
-            None if options.strict && input.is_valid(i) => {
-                return Err(ArrowError::ComputeError(format!(
-                    "Failed to convert timestamp at index {}: invalid 
timestamp value",
-                    i
-                )));
-            }
-            None => {
-                builder.append_null();
-            }
-        }
-    }
-    Ok(())
-}
+use crate::arrow_to_variant::make_arrow_to_variant_row_builder;
+use crate::{CastOptions, VariantArray, VariantArrayBuilder};
+use arrow::array::Array;
+use arrow_schema::ArrowError;
 
 /// Casts a typed arrow [`Array`] to a [`VariantArray`]. This is useful when 
you
 /// need to convert a specific data type
@@ -178,489 +57,34 @@ pub fn cast_to_variant_with_options(
     input: &dyn Array,
     options: &CastOptions,
 ) -> Result<VariantArray, ArrowError> {
-    let mut builder = VariantArrayBuilder::new(input.len());
-
-    let input_type = input.data_type();
-    match input_type {
-        DataType::Null => {
-            for _ in 0..input.len() {
-                builder.append_null();
-            }
-        }
-        DataType::Boolean => {
-            non_generic_conversion_array!(input.as_boolean(), |v| v, builder);
-        }
-        DataType::Int8 => {
-            primitive_conversion_array!(Int8Type, input, builder);
-        }
-        DataType::Int16 => {
-            primitive_conversion_array!(Int16Type, input, builder);
-        }
-        DataType::Int32 => {
-            primitive_conversion_array!(Int32Type, input, builder);
-        }
-        DataType::Int64 => {
-            primitive_conversion_array!(Int64Type, input, builder);
-        }
-        DataType::UInt8 => {
-            primitive_conversion_array!(UInt8Type, input, builder);
-        }
-        DataType::UInt16 => {
-            primitive_conversion_array!(UInt16Type, input, builder);
-        }
-        DataType::UInt32 => {
-            primitive_conversion_array!(UInt32Type, input, builder);
-        }
-        DataType::UInt64 => {
-            primitive_conversion_array!(UInt64Type, input, builder);
-        }
-        DataType::Float16 => {
-            generic_conversion_array!(Float16Type, as_primitive, f32::from, 
input, builder);
-        }
-        DataType::Float32 => {
-            primitive_conversion_array!(Float32Type, input, builder);
-        }
-        DataType::Float64 => {
-            primitive_conversion_array!(Float64Type, input, builder);
-        }
-        DataType::Decimal32(_, scale) => {
-            generic_conversion_array!(
-                Decimal32Type,
-                as_primitive,
-                |v| decimal_to_variant_decimal!(v, scale, i32, 
VariantDecimal4),
-                input,
-                builder
-            );
-        }
-        DataType::Decimal64(_, scale) => {
-            generic_conversion_array!(
-                Decimal64Type,
-                as_primitive,
-                |v| decimal_to_variant_decimal!(v, scale, i64, 
VariantDecimal8),
-                input,
-                builder
-            );
-        }
-        DataType::Decimal128(_, scale) => {
-            generic_conversion_array!(
-                Decimal128Type,
-                as_primitive,
-                |v| decimal_to_variant_decimal!(v, scale, i128, 
VariantDecimal16),
-                input,
-                builder
-            );
-        }
-        DataType::Decimal256(_, scale) => {
-            generic_conversion_array!(
-                Decimal256Type,
-                as_primitive,
-                |v: i256| {
-                    // Since `i128::MAX` is larger than the max value of 
`VariantDecimal16`,
-                    // any `i256` value that cannot be cast to `i128` is 
unable to be cast to `VariantDecimal16` either.
-                    // Therefore, we can safely convert `i256` to `i128` first 
and process it like `i128`.
-                    if let Some(v) = v.to_i128() {
-                        decimal_to_variant_decimal!(v, scale, i128, 
VariantDecimal16)
-                    } else {
-                        Variant::Null
-                    }
-                },
-                input,
-                builder
-            );
-        }
-        DataType::Timestamp(time_unit, time_zone) => {
-            convert_timestamp_with_options(time_unit, time_zone, input, &mut 
builder, options)?;
-        }
-        DataType::Time32(unit) => {
-            match *unit {
-                TimeUnit::Second => {
-                    generic_conversion_array!(
-                        Time32SecondType,
-                        as_primitive,
-                        // nano second are always 0
-                        |v| NaiveTime::from_num_seconds_from_midnight_opt(v as 
u32, 0u32),
-                        input,
-                        builder,
-                        options.strict
-                    )?;
-                }
-                TimeUnit::Millisecond => {
-                    generic_conversion_array!(
-                        Time32MillisecondType,
-                        as_primitive,
-                        |v| NaiveTime::from_num_seconds_from_midnight_opt(
-                            v as u32 / 1000,
-                            (v as u32 % 1000) * 1_000_000
-                        ),
-                        input,
-                        builder,
-                        options.strict
-                    )?;
-                }
-                _ => {
-                    return Err(ArrowError::CastError(format!(
-                        "Unsupported Time32 unit: {:?}",
-                        unit
-                    )));
-                }
-            };
-        }
-        DataType::Time64(unit) => {
-            match *unit {
-                TimeUnit::Microsecond => {
-                    generic_conversion_array!(
-                        Time64MicrosecondType,
-                        as_primitive,
-                        |v| NaiveTime::from_num_seconds_from_midnight_opt(
-                            (v / 1_000_000) as u32,
-                            (v % 1_000_000 * 1_000) as u32
-                        ),
-                        input,
-                        builder,
-                        options.strict
-                    )?;
-                }
-                TimeUnit::Nanosecond => {
-                    generic_conversion_array!(
-                        Time64NanosecondType,
-                        as_primitive,
-                        |v| NaiveTime::from_num_seconds_from_midnight_opt(
-                            (v / 1_000_000_000) as u32,
-                            (v % 1_000_000_000) as u32
-                        ),
-                        input,
-                        builder,
-                        options.strict
-                    )?;
-                }
-                _ => {
-                    return Err(ArrowError::CastError(format!(
-                        "Unsupported Time64 unit: {:?}",
-                        unit
-                    )));
-                }
-            };
-        }
-        DataType::Duration(_) | DataType::Interval(_) => {
-            return Err(ArrowError::InvalidArgumentError(
-                "Casting duration/interval types to Variant is not supported. \
-                 The Variant format does not define duration/interval types."
-                    .to_string(),
-            ));
-        }
-        DataType::Binary => {
-            generic_conversion_array!(BinaryType, as_bytes, |v| v, input, 
builder);
-        }
-        DataType::LargeBinary => {
-            generic_conversion_array!(LargeBinaryType, as_bytes, |v| v, input, 
builder);
-        }
-        DataType::BinaryView => {
-            generic_conversion_array!(BinaryViewType, as_byte_view, |v| v, 
input, builder);
-        }
-        DataType::FixedSizeBinary(_) => {
-            non_generic_conversion_array!(input.as_fixed_size_binary(), |v| v, 
builder);
-        }
-        DataType::Utf8 => {
-            generic_conversion_array!(i32, as_string, |v| v, input, builder);
-        }
-        DataType::LargeUtf8 => {
-            generic_conversion_array!(i64, as_string, |v| v, input, builder);
-        }
-        DataType::Utf8View => {
-            non_generic_conversion_array!(input.as_string_view(), |v| v, 
builder);
-        }
-        DataType::Date32 => {
-            generic_conversion_array!(
-                Date32Type,
-                as_primitive,
-                |v: i32| -> NaiveDate { Date32Type::to_naive_date(v) },
-                input,
-                builder
-            );
-        }
-        DataType::Date64 => {
-            generic_conversion_array!(
-                Date64Type,
-                as_primitive,
-                |v: i64| Date64Type::to_naive_date_opt(v),
-                input,
-                builder,
-                options.strict
-            )?;
-        }
-        DataType::List(_) => convert_list::<i32>(input, &mut builder)?,
-        DataType::LargeList(_) => convert_list::<i64>(input, &mut builder)?,
-        DataType::Struct(_) => convert_struct(input, &mut builder)?,
-        DataType::Map(field, _) => convert_map(field, input, &mut builder)?,
-        DataType::Union(fields, _) => convert_union(fields, input, &mut 
builder)?,
-        DataType::Dictionary(_, _) => convert_dictionary_encoded(input, &mut 
builder)?,
-        DataType::RunEndEncoded(run_ends, _) => match run_ends.data_type() {
-            DataType::Int16 => convert_run_end_encoded::<Int16Type>(input, 
&mut builder)?,
-            DataType::Int32 => convert_run_end_encoded::<Int32Type>(input, 
&mut builder)?,
-            DataType::Int64 => convert_run_end_encoded::<Int64Type>(input, 
&mut builder)?,
-            _ => {
-                return Err(ArrowError::CastError(format!(
-                    "Unsupported run ends type: {:?}",
-                    run_ends.data_type()
-                )));
-            }
-        },
-        dt => {
-            return Err(ArrowError::CastError(format!(
-                "Unsupported data type for casting to Variant: {dt:?}",
-            )));
-        }
-    };
-    Ok(builder.build())
-}
-
-/// Generic function to convert list arrays (both List and LargeList) to 
variant arrays
-fn convert_list<O: OffsetSizeTrait>(
-    input: &dyn Array,
-    builder: &mut VariantArrayBuilder,
-) -> Result<(), ArrowError> {
-    let list_array = input.as_list::<O>();
-    let values = list_array.values();
-    let offsets = list_array.offsets();
-
-    let first_offset = *offsets.first().expect("There should be an offset");
-    let length = *offsets.last().expect("There should be an offset") - 
first_offset;
-    let sliced_values = values.slice(first_offset.as_usize(), 
length.as_usize());
-
-    let values_variant_array = cast_to_variant(sliced_values.as_ref())?;
-    let new_offsets = OffsetBuffer::new(ScalarBuffer::from_iter(
-        offsets.iter().map(|o| *o - first_offset),
-    ));
-
-    for i in 0..list_array.len() {
-        if list_array.is_null(i) {
-            builder.append_null();
-            continue;
-        }
-
-        let start = new_offsets[i].as_usize();
-        let end = new_offsets[i + 1].as_usize();
-
-        // Start building the inner VariantList
-        let mut variant_builder = VariantBuilder::new();
-        let mut list_builder = variant_builder.new_list();
-
-        // Add all values from the slice
-        for j in start..end {
-            list_builder.append_value(values_variant_array.value(j));
-        }
-
-        list_builder.finish();
-
-        let (metadata, value) = variant_builder.finish();
-        let variant = Variant::new(&metadata, &value);
-        builder.append_variant(variant)
-    }
-
-    Ok(())
-}
-
-fn convert_struct(input: &dyn Array, builder: &mut VariantArrayBuilder) -> 
Result<(), ArrowError> {
-    let struct_array = input.as_struct();
-
-    // Pre-convert all field arrays once for better performance
-    // This avoids converting the same field array multiple times
-    // Alternative approach: Use slicing per row: field_array.slice(i, 1)
-    // However, pre-conversion is more efficient for typical use cases
-    let field_variant_arrays: Result<Vec<_>, _> = struct_array
-        .columns()
-        .iter()
-        .map(|field_array| cast_to_variant(field_array.as_ref()))
-        .collect();
-    let field_variant_arrays = field_variant_arrays?;
-
-    // Cache column names to avoid repeated calls
-    let column_names = struct_array.column_names();
-
-    for i in 0..struct_array.len() {
-        if struct_array.is_null(i) {
-            builder.append_null();
-            continue;
-        }
-
-        // Create a VariantBuilder for this struct instance
-        let mut variant_builder = VariantBuilder::new();
-        let mut object_builder = variant_builder.new_object();
-
-        // Iterate through all fields in the struct
-        for (field_idx, field_name) in column_names.iter().enumerate() {
-            // Use pre-converted field variant arrays for better performance
-            // Check nulls directly from the pre-converted arrays instead of 
accessing column again
-            if !field_variant_arrays[field_idx].is_null(i) {
-                let field_variant = field_variant_arrays[field_idx].value(i);
-                object_builder.insert(field_name, field_variant);
-            }
-            // Note: we skip null fields rather than inserting Variant::Null
-            // to match Arrow struct semantics where null fields are omitted
-        }
-
-        object_builder.finish();
-        let (metadata, value) = variant_builder.finish();
-        let variant = Variant::try_new(&metadata, &value)?;
-        builder.append_variant(variant);
-    }
-
-    Ok(())
-}
-
-fn convert_map(
-    field: &FieldRef,
-    input: &dyn Array,
-    builder: &mut VariantArrayBuilder,
-) -> Result<(), ArrowError> {
-    match field.data_type() {
-        DataType::Struct(_) => {
-            let map_array = input.as_map();
-            let keys = cast(map_array.keys(), &DataType::Utf8)?;
-            let key_strings = keys.as_string::<i32>();
-            let values = cast_to_variant(map_array.values())?;
-            let offsets = map_array.offsets();
-
-            let mut start_offset = offsets[0];
-            for end_offset in offsets.iter().skip(1) {
-                if start_offset >= *end_offset {
-                    builder.append_null();
-                    continue;
-                }
-
-                let length = (end_offset - start_offset) as usize;
-
-                let mut variant_builder = VariantBuilder::new();
-                let mut object_builder = variant_builder.new_object();
-
-                for i in start_offset..*end_offset {
-                    let value = values.value(i as usize);
-                    object_builder.insert(key_strings.value(i as usize), 
value);
-                }
-                object_builder.finish();
-                let (metadata, value) = variant_builder.finish();
-                let variant = Variant::try_new(&metadata, &value)?;
+    // Create row builder for the input array type
+    let mut row_builder = make_arrow_to_variant_row_builder(input.data_type(), 
input, options)?;
 
-                builder.append_variant(variant);
+    // Create output array builder
+    let mut array_builder = VariantArrayBuilder::new(input.len());
 
-                start_offset += length as i32;
-            }
-        }
-        _ => {
-            return Err(ArrowError::CastError(format!(
-                "Unsupported map field type for casting to Variant: {field:?}",
-            )));
-        }
+    // Process each row using the row builder
+    for i in 0..input.len() {
+        let mut builder = array_builder.variant_builder();
+        row_builder.append_row(&mut builder, i)?;
+        builder.finish();
     }
 
-    Ok(())
+    Ok(array_builder.build())
 }
 
-/// Convert an array to a `VariantArray` with strict mode enabled (returns 
errors on conversion failures).
+/// Convert an array to a [`VariantArray`] with strict mode enabled (returns 
errors on conversion
+/// failures).
 ///
 /// This function provides backward compatibility. For non-strict behavior,
-/// use `cast_to_variant_with_options` with `CastOptions { strict: false }`.
+/// use [`cast_to_variant_with_options`] with `CastOptions { strict: false }`.
 pub fn cast_to_variant(input: &dyn Array) -> Result<VariantArray, ArrowError> {
     cast_to_variant_with_options(input, &CastOptions::default())
 }
 
-/// Convert union arrays
-fn convert_union(
-    fields: &UnionFields,
-    input: &dyn Array,
-    builder: &mut VariantArrayBuilder,
-) -> Result<(), ArrowError> {
-    let union_array = input.as_union();
-
-    // Convert each child array to variant arrays
-    let mut child_variant_arrays = HashMap::new();
-    for (type_id, _) in fields.iter() {
-        let child_array = union_array.child(type_id);
-        let child_variant_array = cast_to_variant(child_array.as_ref())?;
-        child_variant_arrays.insert(type_id, child_variant_array);
-    }
-
-    // Process each element in the union array
-    for i in 0..union_array.len() {
-        let type_id = union_array.type_id(i);
-        let value_offset = union_array.value_offset(i);
-
-        if let Some(child_variant_array) = child_variant_arrays.get(&type_id) {
-            if child_variant_array.is_null(value_offset) {
-                builder.append_null();
-            } else {
-                let value = child_variant_array.value(value_offset);
-                builder.append_variant(value);
-            }
-        } else {
-            // This should not happen in a valid union, but handle gracefully
-            builder.append_null();
-        }
-    }
-
-    Ok(())
-}
-
-fn convert_dictionary_encoded(
-    input: &dyn Array,
-    builder: &mut VariantArrayBuilder,
-) -> Result<(), ArrowError> {
-    let dict_array = input.as_any_dictionary();
-    let values_variant_array = cast_to_variant(dict_array.values().as_ref())?;
-    let normalized_keys = dict_array.normalized_keys();
-    let keys = dict_array.keys();
-
-    for (i, key_idx) in normalized_keys.iter().enumerate() {
-        if keys.is_null(i) {
-            builder.append_null();
-            continue;
-        }
-
-        if values_variant_array.is_null(*key_idx) {
-            builder.append_null();
-            continue;
-        }
-
-        let value = values_variant_array.value(*key_idx);
-        builder.append_variant(value);
-    }
-
-    Ok(())
-}
-
-fn convert_run_end_encoded<R: RunEndIndexType>(
-    input: &dyn Array,
-    builder: &mut VariantArrayBuilder,
-) -> Result<(), ArrowError> {
-    let run_array = input.as_run::<R>();
-    let values_variant_array = cast_to_variant(run_array.values().as_ref())?;
-
-    // Process runs in batches for better performance
-    let run_ends = run_array.run_ends().values();
-    let mut logical_start = 0;
-
-    for (physical_idx, &run_end) in run_ends.iter().enumerate() {
-        let logical_end = run_end.as_usize();
-        let run_length = logical_end - logical_start;
-
-        if values_variant_array.is_null(physical_idx) {
-            // Append nulls for the entire run
-            for _ in 0..run_length {
-                builder.append_null();
-            }
-        } else {
-            // Get the value once and append it for the entire run
-            let value = values_variant_array.value(physical_idx);
-            for _ in 0..run_length {
-                builder.append_variant(value.clone());
-            }
-        }
-
-        logical_start = logical_end;
-    }
-
-    Ok(())
-}
+// TODO do we need a cast_with_options to allow specifying conversion behavior,
+// e.g. how to handle overflows, whether to convert to Variant::Null or return
+// an error, etc. ?
 
 #[cfg(test)]
 mod tests {
@@ -674,17 +98,24 @@ mod tests {
         IntervalDayTimeArray, IntervalMonthDayNanoArray, 
IntervalYearMonthArray, LargeListArray,
         LargeStringArray, ListArray, MapArray, NullArray, StringArray, 
StringRunBuilder,
         StringViewArray, StructArray, Time32MillisecondArray, 
Time32SecondArray,
-        Time64MicrosecondArray, Time64NanosecondArray, TimestampSecondArray, 
UInt16Array,
+        Time64MicrosecondArray, Time64NanosecondArray, 
TimestampMicrosecondArray,
+        TimestampMillisecondArray, TimestampNanosecondArray, 
TimestampSecondArray, UInt16Array,
         UInt32Array, UInt64Array, UInt8Array, UnionArray,
     };
     use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
-    use arrow::datatypes::{IntervalDayTime, IntervalMonthDayNano};
+    use arrow::datatypes::{
+        i256, BinaryType, BinaryViewType, Date32Type, Date64Type, Int32Type, 
Int64Type, Int8Type,
+        IntervalDayTime, IntervalMonthDayNano, LargeBinaryType,
+    };
     use arrow_schema::{DataType, Field, Fields, UnionFields};
     use arrow_schema::{
         DECIMAL128_MAX_PRECISION, DECIMAL32_MAX_PRECISION, 
DECIMAL64_MAX_PRECISION,
     };
+    use chrono::{DateTime, NaiveDate, NaiveTime};
     use half::f16;
-    use parquet_variant::{Variant, VariantDecimal16};
+    use parquet_variant::{
+        Variant, VariantBuilder, VariantDecimal16, VariantDecimal4, 
VariantDecimal8,
+    };
     use std::{sync::Arc, vec};
 
     macro_rules! max_unscaled_value {
@@ -2139,33 +1570,64 @@ mod tests {
     }
 
     #[test]
-    fn test_cast_to_variant_map_with_nulls() {
-        let keys = vec!["key1", "key2", "key3"];
-        let values_data = Int32Array::from(vec![1, 2, 3]);
-        let entry_offsets = vec![0, 1, 1, 3];
-        let map_array =
-            MapArray::new_from_strings(keys.clone().into_iter(), &values_data, 
&entry_offsets)
-                .unwrap();
+    fn test_cast_to_variant_map_with_nulls_and_empty() {
+        use arrow::array::{Int32Array, MapArray, StringArray, StructArray};
+        use arrow::buffer::{NullBuffer, OffsetBuffer};
+        use arrow::datatypes::{DataType, Field, Fields};
+        use std::sync::Arc;
+
+        // Create entries struct array
+        let keys = StringArray::from(vec!["key1", "key2", "key3"]);
+        let values = Int32Array::from(vec![1, 2, 3]);
+        let entries_fields = Fields::from(vec![
+            Field::new("key", DataType::Utf8, false),
+            Field::new("value", DataType::Int32, true),
+        ]);
+        let entries = StructArray::new(
+            entries_fields.clone(),
+            vec![Arc::new(keys), Arc::new(values)],
+            None,
+        );
+
+        // Create offsets for 4 maps: [0..1], [1..1], [1..1], [1..3]
+        let offsets = OffsetBuffer::new(vec![0, 1, 1, 1, 3].into());
+
+        // Create null buffer - map at index 2 is NULL
+        let null_buffer = Some(NullBuffer::from(vec![true, true, false, 
true]));
+
+        let map_field = Arc::new(Field::new(
+            "entries",
+            DataType::Struct(entries_fields),
+            false,
+        ));
+
+        let map_array = MapArray::try_new(map_field, offsets, entries, 
null_buffer, false).unwrap();
 
         let result = cast_to_variant(&map_array).unwrap();
-        // [{"key1":1}]
-        let variant1 = result.value(0);
+
+        // Map 0: {"key1": 1}
+        let variant0 = result.value(0);
         assert_eq!(
-            variant1.as_object().unwrap().get("key1").unwrap(),
+            variant0.as_object().unwrap().get("key1").unwrap(),
             Variant::from(1)
         );
 
-        // None
-        assert!(result.is_null(1));
+        // Map 1: {} (empty, not null)
+        let variant1 = result.value(1);
+        let obj1 = variant1.as_object().unwrap();
+        assert_eq!(obj1.len(), 0); // Empty object
 
-        // [{"key2":2},{"key3":3}]
-        let variant2 = result.value(2);
+        // Map 2: null (actual NULL)
+        assert!(result.is_null(2));
+
+        // Map 3: {"key2": 2, "key3": 3}
+        let variant3 = result.value(3);
         assert_eq!(
-            variant2.as_object().unwrap().get("key2").unwrap(),
+            variant3.as_object().unwrap().get("key2").unwrap(),
             Variant::from(2)
         );
         assert_eq!(
-            variant2.as_object().unwrap().get("key3").unwrap(),
+            variant3.as_object().unwrap().get("key3").unwrap(),
             Variant::from(3)
         );
     }
@@ -2448,6 +1910,8 @@ mod tests {
 
     #[test]
     fn test_cast_to_variant_non_strict_mode_timestamp() {
+        use arrow::temporal_conversions::timestamp_s_to_datetime;
+
         let ts_array = TimestampSecondArray::from(vec![Some(i64::MAX), 
Some(0), Some(1609459200)])
             .with_timezone_opt(None::<&str>);
 
diff --git a/parquet-variant-compute/src/lib.rs 
b/parquet-variant-compute/src/lib.rs
index 3c928636ac..e9a6e0c49f 100644
--- a/parquet-variant-compute/src/lib.rs
+++ b/parquet-variant-compute/src/lib.rs
@@ -35,6 +35,7 @@
 //! [`VariantPath`]: parquet_variant::VariantPath
 //! [Variant issue]: https://github.com/apache/arrow-rs/issues/6736
 
+mod arrow_to_variant;
 pub mod cast_to_variant;
 mod from_json;
 mod to_json;
@@ -46,6 +47,7 @@ pub mod variant_get;
 pub use variant_array::{ShreddingState, VariantArray};
 pub use variant_array_builder::{VariantArrayBuilder, 
VariantArrayVariantBuilder};
 
-pub use cast_to_variant::{cast_to_variant, cast_to_variant_with_options, 
CastOptions};
+pub use cast_to_variant::{cast_to_variant, cast_to_variant_with_options};
 pub use from_json::json_to_variant;
 pub use to_json::variant_to_json;
+pub use type_conversion::CastOptions;
diff --git a/parquet-variant-compute/src/type_conversion.rs 
b/parquet-variant-compute/src/type_conversion.rs
index aa60b425a1..d2a63f49de 100644
--- a/parquet-variant-compute/src/type_conversion.rs
+++ b/parquet-variant-compute/src/type_conversion.rs
@@ -17,46 +17,18 @@
 
 //! Module for transforming a typed arrow `Array` to `VariantArray`.
 
-/// Convert the input array to a `VariantArray` row by row, using `method`
-/// not requiring a generic type to downcast the generic array to a specific
-/// array type and `cast_fn` to transform each element to a type compatible 
with Variant
-/// If `strict` is true(default), return error on conversion failure. If 
false, insert null.
-macro_rules! non_generic_conversion_array {
-    ($array:expr, $cast_fn:expr, $builder:expr) => {{
-        let array = $array;
-        for i in 0..array.len() {
-            if array.is_null(i) {
-                $builder.append_null();
-                continue;
-            }
-            let cast_value = $cast_fn(array.value(i));
-            $builder.append_variant(Variant::from(cast_value));
-        }
-    }};
-    ($array:expr, $cast_fn:expr, $builder:expr, $strict:expr) => {{
-        let array = $array;
-        for i in 0..array.len() {
-            if array.is_null(i) {
-                $builder.append_null();
-                continue;
-            }
-            match $cast_fn(array.value(i)) {
-                Some(cast_value) => {
-                    $builder.append_variant(Variant::from(cast_value));
-                }
-                None if $strict => {
-                    return Err(ArrowError::ComputeError(format!(
-                        "Failed to convert value at index {}: conversion 
failed",
-                        i
-                    )));
-                }
-                None => $builder.append_null(),
-            }
-        }
-        Ok::<(), ArrowError>(())
-    }};
+/// Options for controlling the behavior of `cast_to_variant_with_options`.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct CastOptions {
+    /// If true, return error on conversion failure. If false, insert null for 
failed conversions.
+    pub strict: bool,
+}
+
+impl Default for CastOptions {
+    fn default() -> Self {
+        Self { strict: true }
+    }
 }
-pub(crate) use non_generic_conversion_array;
 
 /// Convert the value at a specific index in the given array into a `Variant`.
 macro_rules! non_generic_conversion_single_value {
@@ -72,29 +44,6 @@ macro_rules! non_generic_conversion_single_value {
 }
 pub(crate) use non_generic_conversion_single_value;
 
-/// Convert the input array to a `VariantArray` row by row, using `method`
-/// requiring a generic type to downcast the generic array to a specific
-/// array type and `cast_fn` to transform each element to a type compatible 
with Variant
-/// If `strict` is true(default), return error on conversion failure. If 
false, insert null.
-macro_rules! generic_conversion_array {
-    ($t:ty, $method:ident, $cast_fn:expr, $input:expr, $builder:expr) => {{
-        $crate::type_conversion::non_generic_conversion_array!(
-            $input.$method::<$t>(),
-            $cast_fn,
-            $builder
-        )
-    }};
-    ($t:ty, $method:ident, $cast_fn:expr, $input:expr, $builder:expr, 
$strict:expr) => {{
-        $crate::type_conversion::non_generic_conversion_array!(
-            $input.$method::<$t>(),
-            $cast_fn,
-            $builder,
-            $strict
-        )
-    }};
-}
-pub(crate) use generic_conversion_array;
-
 /// Convert the value at a specific index in the given array into a `Variant`,
 /// using `method` requiring a generic type to downcast the generic array
 /// to a specific array type and `cast_fn` to transform the element.
@@ -109,21 +58,6 @@ macro_rules! generic_conversion_single_value {
 }
 pub(crate) use generic_conversion_single_value;
 
-/// Convert the input array of a specific primitive type to a `VariantArray`
-/// row by row
-macro_rules! primitive_conversion_array {
-    ($t:ty, $input:expr, $builder:expr) => {{
-        $crate::type_conversion::generic_conversion_array!(
-            $t,
-            as_primitive,
-            |v| v,
-            $input,
-            $builder
-        )
-    }};
-}
-pub(crate) use primitive_conversion_array;
-
 /// Convert the value at a specific index in the given array into a `Variant`.
 macro_rules! primitive_conversion_single_value {
     ($t:ty, $input:expr, $index:expr) => {{
@@ -155,19 +89,3 @@ macro_rules! decimal_to_variant_decimal {
     }};
 }
 pub(crate) use decimal_to_variant_decimal;
-
-/// Convert a timestamp value to a `VariantTimestamp`
-macro_rules! timestamp_to_variant_timestamp {
-    ($ts_array:expr, $converter:expr, $unit_name:expr, $strict:expr) => {
-        if $strict {
-            let error =
-                || ArrowError::ComputeError(format!("Invalid timestamp {} 
value", $unit_name));
-            let converter = |x| $converter(x).ok_or_else(error);
-            let iter = $ts_array.iter().map(|x| x.map(converter).transpose());
-            iter.collect::<Result<Vec<_>, ArrowError>>()?
-        } else {
-            $ts_array.iter().map(|x| x.and_then($converter)).collect()
-        }
-    };
-}
-pub(crate) use timestamp_to_variant_timestamp;
diff --git a/parquet-variant-compute/src/variant_array_builder.rs 
b/parquet-variant-compute/src/variant_array_builder.rs
index d5f578421e..aa3e1dbdfc 100644
--- a/parquet-variant-compute/src/variant_array_builder.rs
+++ b/parquet-variant-compute/src/variant_array_builder.rs
@@ -199,9 +199,14 @@ pub struct VariantArrayVariantBuilder<'a> {
     metadata_offsets: &'a mut Vec<usize>,
     value_offsets: &'a mut Vec<usize>,
     nulls: &'a mut NullBufferBuilder,
+    is_null: bool,
 }
 
 impl VariantBuilderExt for VariantArrayVariantBuilder<'_> {
+    /// Appending NULL to a variant array produces an actual NULL value
+    fn append_null(&mut self) {
+        self.is_null = true;
+    }
     fn append_value<'m, 'v>(&mut self, value: impl Into<Variant<'m, 'v>>) {
         ValueBuilder::append_variant(self.parent_state(), value.into());
     }
@@ -228,6 +233,7 @@ impl<'a> VariantArrayVariantBuilder<'a> {
             metadata_offsets: &mut builder.metadata_offsets,
             value_offsets: &mut builder.value_offsets,
             nulls: &mut builder.nulls,
+            is_null: false,
         }
     }
 
@@ -239,10 +245,20 @@ impl<'a> VariantArrayVariantBuilder<'a> {
     pub fn finish(mut self) {
         // Record the ending offsets after finishing metadata and finish the 
parent state.
         let (value_builder, metadata_builder) = 
self.parent_state.value_and_metadata_builders();
-        self.metadata_offsets.push(metadata_builder.finish());
-        self.value_offsets.push(value_builder.offset());
-        self.nulls.append_non_null();
-        self.parent_state.finish();
+        let (metadata_offset, value_offset, not_null) = if self.is_null {
+            // Do not `finish`, just repeat the previous offset for a 
physically empty result
+            let metadata_offset = 
self.metadata_offsets.last().copied().unwrap_or(0);
+            let value_offset = self.value_offsets.last().copied().unwrap_or(0);
+            (metadata_offset, value_offset, false)
+        } else {
+            let metadata_offset = metadata_builder.finish();
+            let value_offset = value_builder.offset();
+            self.parent_state.finish();
+            (metadata_offset, value_offset, true)
+        };
+        self.metadata_offsets.push(metadata_offset);
+        self.value_offsets.push(value_offset);
+        self.nulls.append(not_null);
     }
 
     fn parent_state(&mut self) -> ParentState<'_> {
diff --git a/parquet-variant-json/src/from_json.rs 
b/parquet-variant-json/src/from_json.rs
index 90b26f7d30..3a6e869ec1 100644
--- a/parquet-variant-json/src/from_json.rs
+++ b/parquet-variant-json/src/from_json.rs
@@ -18,7 +18,7 @@
 //! Module for parsing JSON strings as Variant
 
 use arrow_schema::ArrowError;
-use parquet_variant::{ListBuilder, ObjectBuilder, Variant, VariantBuilderExt};
+use parquet_variant::{ObjectFieldBuilder, Variant, VariantBuilderExt};
 use serde_json::{Number, Value};
 
 /// Converts a JSON string to Variant using a [`VariantBuilderExt`], such as
@@ -120,10 +120,7 @@ fn append_json(json: &Value, builder: &mut impl 
VariantBuilderExt) -> Result<(),
         Value::Object(obj) => {
             let mut obj_builder = builder.try_new_object()?;
             for (key, value) in obj.iter() {
-                let mut field_builder = ObjectFieldBuilder {
-                    key,
-                    builder: &mut obj_builder,
-                };
+                let mut field_builder = ObjectFieldBuilder::new(key, &mut 
obj_builder);
                 append_json(value, &mut field_builder)?;
             }
             obj_builder.finish();
@@ -132,25 +129,6 @@ fn append_json(json: &Value, builder: &mut impl 
VariantBuilderExt) -> Result<(),
     Ok(())
 }
 
-struct ObjectFieldBuilder<'o, 'v, 's> {
-    key: &'s str,
-    builder: &'o mut ObjectBuilder<'v>,
-}
-
-impl VariantBuilderExt for ObjectFieldBuilder<'_, '_, '_> {
-    fn append_value<'m, 'v>(&mut self, value: impl Into<Variant<'m, 'v>>) {
-        self.builder.insert(self.key, value);
-    }
-
-    fn try_new_list(&mut self) -> Result<ListBuilder<'_>, ArrowError> {
-        self.builder.try_new_list(self.key)
-    }
-
-    fn try_new_object(&mut self) -> Result<ObjectBuilder<'_>, ArrowError> {
-        self.builder.try_new_object(self.key)
-    }
-}
-
 #[cfg(test)]
 mod test {
     use super::*;
diff --git a/parquet-variant/src/builder.rs b/parquet-variant/src/builder.rs
index 2fa8d0981c..a7eb246798 100644
--- a/parquet-variant/src/builder.rs
+++ b/parquet-variant/src/builder.rs
@@ -1706,6 +1706,10 @@ impl<'a> ObjectBuilder<'a> {
 /// Allows users to append values to a [`VariantBuilder`], [`ListBuilder`] or
 /// [`ObjectBuilder`]. using the same interface.
 pub trait VariantBuilderExt {
+    /// Appends a NULL value to this builder. The semantics depend on the 
implementation, but will
+    /// often translate to appending a [`Variant::Null`] value.
+    fn append_null(&mut self);
+
     /// Appends a new variant value to this builder. See e.g. 
[`VariantBuilder::append_value`].
     fn append_value<'m, 'v>(&mut self, value: impl Into<Variant<'m, 'v>>);
 
@@ -1731,6 +1735,10 @@ pub trait VariantBuilderExt {
 }
 
 impl VariantBuilderExt for ListBuilder<'_> {
+    /// Variant arrays cannot encode NULL values, only `Variant::Null`.
+    fn append_null(&mut self) {
+        self.append_value(Variant::Null);
+    }
     fn append_value<'m, 'v>(&mut self, value: impl Into<Variant<'m, 'v>>) {
         self.append_value(value);
     }
@@ -1745,6 +1753,11 @@ impl VariantBuilderExt for ListBuilder<'_> {
 }
 
 impl VariantBuilderExt for VariantBuilder {
+    /// Variant values cannot encode NULL, only [`Variant::Null`]. This is 
different from the column
+    /// that holds variant values being NULL at some positions.
+    fn append_null(&mut self) {
+        self.append_value(Variant::Null);
+    }
     fn append_value<'m, 'v>(&mut self, value: impl Into<Variant<'m, 'v>>) {
         self.append_value(value);
     }
@@ -1758,6 +1771,34 @@ impl VariantBuilderExt for VariantBuilder {
     }
 }
 
+/// A [`VariantBuilderExt`] that inserts a new field into a variant object.
+pub struct ObjectFieldBuilder<'o, 'v, 's> {
+    key: &'s str,
+    builder: &'o mut ObjectBuilder<'v>,
+}
+
+impl<'o, 'v, 's> ObjectFieldBuilder<'o, 'v, 's> {
+    pub fn new(key: &'s str, builder: &'o mut ObjectBuilder<'v>) -> Self {
+        Self { key, builder }
+    }
+}
+
+impl VariantBuilderExt for ObjectFieldBuilder<'_, '_, '_> {
+    /// A NULL object field is interpreted as missing, so nothing gets 
inserted at all.
+    fn append_null(&mut self) {}
+    fn append_value<'m, 'v>(&mut self, value: impl Into<Variant<'m, 'v>>) {
+        self.builder.insert(self.key, value);
+    }
+
+    fn try_new_list(&mut self) -> Result<ListBuilder<'_>, ArrowError> {
+        self.builder.try_new_list(self.key)
+    }
+
+    fn try_new_object(&mut self) -> Result<ObjectBuilder<'_>, ArrowError> {
+        self.builder.try_new_object(self.key)
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use crate::VariantMetadata;

Reply via email to