scovich commented on code in PR #7946: URL: https://github.com/apache/arrow-rs/pull/7946#discussion_r2219033484
########## parquet-variant-compute/src/variant_array.rs: ########## @@ -158,13 +159,93 @@ impl VariantArray { /// Return a reference to the metadata field of the [`StructArray`] pub fn metadata_field(&self) -> &ArrayRef { // spec says fields order is not guaranteed, so we search by name - &self.metadata_ref + self.inner.column_by_name("metadata").unwrap() } /// Return a reference to the value field of the `StructArray` pub fn value_field(&self) -> &ArrayRef { // spec says fields order is not guaranteed, so we search by name - &self.value_ref + self.inner.column_by_name("value").unwrap() + } + + /// Get the metadata bytes for a specific index + pub fn metadata_bytes(&self, index: usize) -> &[u8] { + self.metadata_field().as_binary_view().value(index).as_ref() + } + + /// Get the value bytes for a specific index + pub fn value_bytes(&self, index: usize) -> &[u8] { + self.value_field().as_binary_view().value(index).as_ref() + } + + /// Get the field names for an object at the given index + pub fn get_field_names(&self, index: usize) -> Vec<String> { + if index >= self.len() { + return vec![]; + } + + if self.is_null(index) { + return vec![]; + } + + let variant = self.value(index); + if let Some(obj) = variant.as_object() { + let mut field_names = Vec::new(); + for i in 0..obj.len() { + if let Some(field_name) = obj.field_name(i) { Review Comment: Seems like this should be an `unwrap`, since the only way to get None here is by an out of bounds index? ########## parquet-variant-compute/src/variant_array.rs: ########## @@ -158,13 +159,93 @@ impl VariantArray { /// Return a reference to the metadata field of the [`StructArray`] pub fn metadata_field(&self) -> &ArrayRef { // spec says fields order is not guaranteed, so we search by name - &self.metadata_ref + self.inner.column_by_name("metadata").unwrap() Review Comment: This will be solved/changed in - https://github.com/apache/arrow-rs/pull/7921/ right? ########## parquet-variant-compute/src/variant_parser.rs: ########## Review Comment: There seems to be a lot of redundancy in this file vs. decoder.rs? Can we reduce or eliminate that? ########## parquet-variant-compute/src/variant_parser.rs: ########## @@ -0,0 +1,669 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Low-level binary format parsing for variant objects + +use arrow::error::ArrowError; + +/// Basic variant type enumeration for the first 2 bits of header +#[derive(Debug, Clone, PartialEq)] +pub enum VariantBasicType { + Primitive = 0, + ShortString = 1, + Object = 2, + Array = 3, +} + +/// Primitive type variants +#[derive(Debug, Clone, PartialEq)] +pub enum PrimitiveType { + Null, + True, + False, + Int8, + Int16, + Int32, + Int64, + Double, + Decimal4, + Decimal8, + Decimal16, + Date, + TimestampNtz, + TimestampLtz, + Float, + Binary, + String, +} + +/// Variant type enumeration covering all possible types +#[derive(Debug, Clone, PartialEq)] +pub enum VariantType { + Primitive(PrimitiveType), + ShortString(ShortStringHeader), + Object(ObjectHeader), + Array(ArrayHeader), +} + +/// Short string header structure +#[derive(Debug, Clone, PartialEq)] +pub struct ShortStringHeader { + pub length: usize, +} + +/// Object header structure for variant objects +#[derive(Debug, Clone, PartialEq)] +pub struct ObjectHeader { + pub num_elements_size: usize, + pub field_id_size: usize, + pub field_offset_size: usize, + pub is_large: bool, +} + +/// Array header structure for variant objects +#[derive(Debug, Clone, PartialEq)] +pub struct ArrayHeader { + pub num_elements_size: usize, + pub element_offset_size: usize, + pub is_large: bool, +} + +/// Object byte offsets structure +#[derive(Debug, Clone)] +pub struct ObjectOffsets { + pub field_ids_start: usize, + pub field_offsets_start: usize, + pub values_start: usize, +} + +/// Array byte offsets structure +#[derive(Debug, Clone)] +pub struct ArrayOffsets { + pub element_offsets_start: usize, + pub elements_start: usize, +} + +/// Low-level parser for variant binary format +pub struct VariantParser; + +impl VariantParser { + /// General dispatch function to parse any variant header + pub fn parse_variant_header(header_byte: u8) -> Result<VariantType, ArrowError> { + let basic_type = Self::get_basic_type(header_byte); + + match basic_type { + VariantBasicType::Primitive => Ok(VariantType::Primitive( + Self::parse_primitive_header(header_byte)?, + )), + VariantBasicType::ShortString => Ok(VariantType::ShortString( + Self::parse_short_string_header(header_byte)?, + )), + VariantBasicType::Object => { + Ok(VariantType::Object(Self::parse_object_header(header_byte)?)) + } + VariantBasicType::Array => { + Ok(VariantType::Array(Self::parse_array_header(header_byte)?)) + } + } + } + + /// Parse primitive type header + pub fn parse_primitive_header(header_byte: u8) -> Result<PrimitiveType, ArrowError> { + let primitive_type = header_byte >> 2; + + match primitive_type { + 0 => Ok(PrimitiveType::Null), + 1 => Ok(PrimitiveType::True), + 2 => Ok(PrimitiveType::False), + 3 => Ok(PrimitiveType::Int8), + 4 => Ok(PrimitiveType::Int16), + 5 => Ok(PrimitiveType::Int32), + 6 => Ok(PrimitiveType::Int64), + 7 => Ok(PrimitiveType::Double), + 8 => Ok(PrimitiveType::Decimal4), + 9 => Ok(PrimitiveType::Decimal8), + 10 => Ok(PrimitiveType::Decimal16), + 11 => Ok(PrimitiveType::Date), + 12 => Ok(PrimitiveType::TimestampNtz), + 13 => Ok(PrimitiveType::TimestampLtz), + 14 => Ok(PrimitiveType::Float), + 15 => Ok(PrimitiveType::Binary), + 16 => Ok(PrimitiveType::String), + _ => Err(ArrowError::InvalidArgumentError(format!( + "Invalid primitive type: {}", + primitive_type + ))), + } + } + + /// Get the basic type from header byte + pub fn get_basic_type(header_byte: u8) -> VariantBasicType { + match header_byte & 0x03 { + 0 => VariantBasicType::Primitive, + 1 => VariantBasicType::ShortString, + 2 => VariantBasicType::Object, + 3 => VariantBasicType::Array, + _ => panic!("Invalid basic type: {}", header_byte & 0x03), Review Comment: See the corresponding code in decoder.rs -- this is `unreachable!` because `i & 0x03` provably cannot produce any values outside `0..4` ########## parquet-variant-compute/src/variant_parser.rs: ########## @@ -0,0 +1,669 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Low-level binary format parsing for variant objects + +use arrow::error::ArrowError; + +/// Basic variant type enumeration for the first 2 bits of header +#[derive(Debug, Clone, PartialEq)] +pub enum VariantBasicType { + Primitive = 0, + ShortString = 1, + Object = 2, + Array = 3, +} + +/// Primitive type variants +#[derive(Debug, Clone, PartialEq)] +pub enum PrimitiveType { + Null, + True, + False, + Int8, + Int16, + Int32, + Int64, + Double, + Decimal4, + Decimal8, + Decimal16, + Date, + TimestampNtz, + TimestampLtz, + Float, + Binary, + String, +} + +/// Variant type enumeration covering all possible types +#[derive(Debug, Clone, PartialEq)] +pub enum VariantType { + Primitive(PrimitiveType), + ShortString(ShortStringHeader), + Object(ObjectHeader), + Array(ArrayHeader), +} + +/// Short string header structure +#[derive(Debug, Clone, PartialEq)] +pub struct ShortStringHeader { + pub length: usize, +} + +/// Object header structure for variant objects +#[derive(Debug, Clone, PartialEq)] +pub struct ObjectHeader { + pub num_elements_size: usize, + pub field_id_size: usize, + pub field_offset_size: usize, + pub is_large: bool, +} + +/// Array header structure for variant objects +#[derive(Debug, Clone, PartialEq)] +pub struct ArrayHeader { + pub num_elements_size: usize, + pub element_offset_size: usize, + pub is_large: bool, +} + +/// Object byte offsets structure +#[derive(Debug, Clone)] +pub struct ObjectOffsets { + pub field_ids_start: usize, + pub field_offsets_start: usize, + pub values_start: usize, +} + +/// Array byte offsets structure +#[derive(Debug, Clone)] +pub struct ArrayOffsets { + pub element_offsets_start: usize, + pub elements_start: usize, +} + +/// Low-level parser for variant binary format +pub struct VariantParser; + +impl VariantParser { + /// General dispatch function to parse any variant header + pub fn parse_variant_header(header_byte: u8) -> Result<VariantType, ArrowError> { + let basic_type = Self::get_basic_type(header_byte); + + match basic_type { + VariantBasicType::Primitive => Ok(VariantType::Primitive( + Self::parse_primitive_header(header_byte)?, + )), + VariantBasicType::ShortString => Ok(VariantType::ShortString( + Self::parse_short_string_header(header_byte)?, + )), + VariantBasicType::Object => { + Ok(VariantType::Object(Self::parse_object_header(header_byte)?)) + } + VariantBasicType::Array => { + Ok(VariantType::Array(Self::parse_array_header(header_byte)?)) + } + } + } + + /// Parse primitive type header + pub fn parse_primitive_header(header_byte: u8) -> Result<PrimitiveType, ArrowError> { + let primitive_type = header_byte >> 2; + + match primitive_type { + 0 => Ok(PrimitiveType::Null), + 1 => Ok(PrimitiveType::True), + 2 => Ok(PrimitiveType::False), + 3 => Ok(PrimitiveType::Int8), + 4 => Ok(PrimitiveType::Int16), + 5 => Ok(PrimitiveType::Int32), + 6 => Ok(PrimitiveType::Int64), + 7 => Ok(PrimitiveType::Double), + 8 => Ok(PrimitiveType::Decimal4), + 9 => Ok(PrimitiveType::Decimal8), + 10 => Ok(PrimitiveType::Decimal16), + 11 => Ok(PrimitiveType::Date), + 12 => Ok(PrimitiveType::TimestampNtz), + 13 => Ok(PrimitiveType::TimestampLtz), + 14 => Ok(PrimitiveType::Float), + 15 => Ok(PrimitiveType::Binary), + 16 => Ok(PrimitiveType::String), + _ => Err(ArrowError::InvalidArgumentError(format!( + "Invalid primitive type: {}", + primitive_type + ))), + } + } + + /// Get the basic type from header byte + pub fn get_basic_type(header_byte: u8) -> VariantBasicType { + match header_byte & 0x03 { + 0 => VariantBasicType::Primitive, + 1 => VariantBasicType::ShortString, + 2 => VariantBasicType::Object, + 3 => VariantBasicType::Array, + _ => panic!("Invalid basic type: {}", header_byte & 0x03), + } + } + + + + /// Parse short string header + pub fn parse_short_string_header(header_byte: u8) -> Result<ShortStringHeader, ArrowError> { + let length = (header_byte >> 2) as usize; + + // Short strings can be up to 64 bytes (6-bit value: 0-63) + if length > 63 { + return Err(ArrowError::InvalidArgumentError(format!( + "Short string length {} exceeds maximum of 63", + length + ))); + } + + Ok(ShortStringHeader { length }) + } + + /// Parse object header from header byte + pub fn parse_object_header(header_byte: u8) -> Result<ObjectHeader, ArrowError> { + let value_header = header_byte >> 2; + let field_offset_size_minus_one = value_header & 0x03; + let field_id_size_minus_one = (value_header >> 2) & 0x03; + let is_large = (value_header & 0x10) != 0; + + let num_elements_size = if is_large { 4 } else { 1 }; + let field_id_size = (field_id_size_minus_one + 1) as usize; + let field_offset_size = (field_offset_size_minus_one + 1) as usize; + + Ok(ObjectHeader { + num_elements_size, + field_id_size, + field_offset_size, + is_large, + }) + } + + /// Parse array header from header byte + pub fn parse_array_header(header_byte: u8) -> Result<ArrayHeader, ArrowError> { + let value_header = header_byte >> 2; + let element_offset_size_minus_one = value_header & 0x03; + let is_large = (value_header & 0x10) != 0; + + let num_elements_size = if is_large { 4 } else { 1 }; + let element_offset_size = (element_offset_size_minus_one + 1) as usize; + + Ok(ArrayHeader { + num_elements_size, + element_offset_size, + is_large, + }) + } + + /// Unpack integer from bytes + pub fn unpack_int(bytes: &[u8], size: usize) -> Result<usize, ArrowError> { + if bytes.len() < size { + return Err(ArrowError::InvalidArgumentError( + "Not enough bytes to unpack integer".to_string(), + )); + } + + match size { + 1 => Ok(bytes[0] as usize), + 2 => Ok(u16::from_le_bytes([bytes[0], bytes[1]]) as usize), + 3 => Ok(u32::from_le_bytes([bytes[0], bytes[1], bytes[2], 0]) as usize), + 4 => Ok(u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as usize), + _ => Err(ArrowError::InvalidArgumentError(format!( + "Invalid integer size: {}", + size + ))), + } + } + + /// Calculate the size needed to store an integer + pub fn calculate_int_size(value: usize) -> usize { + if value <= u8::MAX as usize { + 1 + } else if value <= u16::MAX as usize { + 2 + } else if value <= 0xFFFFFF { + 3 + } else { + 4 + } Review Comment: nit: ```suggestion match value { value if value <= u8::MAX as usize => 1, value if value <= u16::MAX as usize => 2, value if value <= 0xFFFFFF => 3, _ => 4, } ``` ########## parquet-variant-compute/src/field_operations.rs: ########## @@ -0,0 +1,492 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Field extraction and removal operations for variant objects + +use crate::variant_parser::{ObjectHeader, ObjectOffsets, VariantParser}; +use arrow::error::ArrowError; +use parquet_variant::{VariantMetadata, VariantPath, VariantPathElement}; +use std::collections::HashSet; + +/// Field operations for variant objects +pub struct FieldOperations; + +impl FieldOperations { + /// Extract field bytes from a single variant object + pub fn extract_field_bytes( + metadata_bytes: &[u8], + value_bytes: &[u8], + field_name: &str, + ) -> Result<Option<Vec<u8>>, ArrowError> { + if !VariantParser::is_object(value_bytes) { + return Ok(None); + } + + let header_byte = value_bytes[0]; + let header = VariantParser::parse_object_header(header_byte)?; + let num_elements = VariantParser::unpack_int(&value_bytes[1..], header.num_elements_size)?; + let offsets = VariantParser::calculate_object_offsets(&header, num_elements); + + // Find field ID for the target field name + let target_field_id = Self::find_field_id(metadata_bytes, field_name)?; + let target_field_id = match target_field_id { + Some(id) => id, + None => return Ok(None), // Field not found + }; + + // Search for the field in the object + for i in 0..num_elements { + let field_id_offset = offsets.field_ids_start + (i * header.field_id_size); Review Comment: integer overflow panic risk on 32-bit arch ########## parquet-variant-compute/src/variant_parser.rs: ########## @@ -0,0 +1,669 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Low-level binary format parsing for variant objects + +use arrow::error::ArrowError; + +/// Basic variant type enumeration for the first 2 bits of header +#[derive(Debug, Clone, PartialEq)] +pub enum VariantBasicType { + Primitive = 0, + ShortString = 1, + Object = 2, + Array = 3, +} + +/// Primitive type variants +#[derive(Debug, Clone, PartialEq)] +pub enum PrimitiveType { + Null, + True, + False, + Int8, + Int16, + Int32, + Int64, + Double, + Decimal4, + Decimal8, + Decimal16, + Date, + TimestampNtz, + TimestampLtz, + Float, + Binary, + String, +} + +/// Variant type enumeration covering all possible types +#[derive(Debug, Clone, PartialEq)] +pub enum VariantType { + Primitive(PrimitiveType), + ShortString(ShortStringHeader), + Object(ObjectHeader), + Array(ArrayHeader), +} + +/// Short string header structure +#[derive(Debug, Clone, PartialEq)] +pub struct ShortStringHeader { + pub length: usize, +} + +/// Object header structure for variant objects +#[derive(Debug, Clone, PartialEq)] +pub struct ObjectHeader { + pub num_elements_size: usize, + pub field_id_size: usize, + pub field_offset_size: usize, + pub is_large: bool, +} + +/// Array header structure for variant objects +#[derive(Debug, Clone, PartialEq)] +pub struct ArrayHeader { + pub num_elements_size: usize, + pub element_offset_size: usize, + pub is_large: bool, +} + +/// Object byte offsets structure +#[derive(Debug, Clone)] +pub struct ObjectOffsets { + pub field_ids_start: usize, + pub field_offsets_start: usize, + pub values_start: usize, +} + +/// Array byte offsets structure +#[derive(Debug, Clone)] +pub struct ArrayOffsets { + pub element_offsets_start: usize, + pub elements_start: usize, +} + +/// Low-level parser for variant binary format +pub struct VariantParser; + +impl VariantParser { + /// General dispatch function to parse any variant header + pub fn parse_variant_header(header_byte: u8) -> Result<VariantType, ArrowError> { + let basic_type = Self::get_basic_type(header_byte); + + match basic_type { + VariantBasicType::Primitive => Ok(VariantType::Primitive( + Self::parse_primitive_header(header_byte)?, + )), + VariantBasicType::ShortString => Ok(VariantType::ShortString( + Self::parse_short_string_header(header_byte)?, + )), + VariantBasicType::Object => { + Ok(VariantType::Object(Self::parse_object_header(header_byte)?)) + } + VariantBasicType::Array => { + Ok(VariantType::Array(Self::parse_array_header(header_byte)?)) + } + } + } + + /// Parse primitive type header + pub fn parse_primitive_header(header_byte: u8) -> Result<PrimitiveType, ArrowError> { + let primitive_type = header_byte >> 2; + + match primitive_type { + 0 => Ok(PrimitiveType::Null), + 1 => Ok(PrimitiveType::True), + 2 => Ok(PrimitiveType::False), + 3 => Ok(PrimitiveType::Int8), + 4 => Ok(PrimitiveType::Int16), + 5 => Ok(PrimitiveType::Int32), + 6 => Ok(PrimitiveType::Int64), + 7 => Ok(PrimitiveType::Double), + 8 => Ok(PrimitiveType::Decimal4), + 9 => Ok(PrimitiveType::Decimal8), + 10 => Ok(PrimitiveType::Decimal16), + 11 => Ok(PrimitiveType::Date), + 12 => Ok(PrimitiveType::TimestampNtz), + 13 => Ok(PrimitiveType::TimestampLtz), + 14 => Ok(PrimitiveType::Float), + 15 => Ok(PrimitiveType::Binary), + 16 => Ok(PrimitiveType::String), + _ => Err(ArrowError::InvalidArgumentError(format!( + "Invalid primitive type: {}", + primitive_type + ))), + } + } + + /// Get the basic type from header byte + pub fn get_basic_type(header_byte: u8) -> VariantBasicType { + match header_byte & 0x03 { + 0 => VariantBasicType::Primitive, + 1 => VariantBasicType::ShortString, + 2 => VariantBasicType::Object, + 3 => VariantBasicType::Array, + _ => panic!("Invalid basic type: {}", header_byte & 0x03), + } + } + + + + /// Parse short string header + pub fn parse_short_string_header(header_byte: u8) -> Result<ShortStringHeader, ArrowError> { + let length = (header_byte >> 2) as usize; + + // Short strings can be up to 64 bytes (6-bit value: 0-63) + if length > 63 { + return Err(ArrowError::InvalidArgumentError(format!( + "Short string length {} exceeds maximum of 63", + length + ))); + } + + Ok(ShortStringHeader { length }) + } + + /// Parse object header from header byte + pub fn parse_object_header(header_byte: u8) -> Result<ObjectHeader, ArrowError> { + let value_header = header_byte >> 2; + let field_offset_size_minus_one = value_header & 0x03; + let field_id_size_minus_one = (value_header >> 2) & 0x03; + let is_large = (value_header & 0x10) != 0; + + let num_elements_size = if is_large { 4 } else { 1 }; + let field_id_size = (field_id_size_minus_one + 1) as usize; + let field_offset_size = (field_offset_size_minus_one + 1) as usize; + + Ok(ObjectHeader { + num_elements_size, + field_id_size, + field_offset_size, + is_large, + }) + } + + /// Parse array header from header byte + pub fn parse_array_header(header_byte: u8) -> Result<ArrayHeader, ArrowError> { + let value_header = header_byte >> 2; + let element_offset_size_minus_one = value_header & 0x03; + let is_large = (value_header & 0x10) != 0; + + let num_elements_size = if is_large { 4 } else { 1 }; + let element_offset_size = (element_offset_size_minus_one + 1) as usize; + + Ok(ArrayHeader { + num_elements_size, + element_offset_size, + is_large, + }) + } + + /// Unpack integer from bytes + pub fn unpack_int(bytes: &[u8], size: usize) -> Result<usize, ArrowError> { + if bytes.len() < size { + return Err(ArrowError::InvalidArgumentError( + "Not enough bytes to unpack integer".to_string(), + )); + } + + match size { + 1 => Ok(bytes[0] as usize), + 2 => Ok(u16::from_le_bytes([bytes[0], bytes[1]]) as usize), + 3 => Ok(u32::from_le_bytes([bytes[0], bytes[1], bytes[2], 0]) as usize), + 4 => Ok(u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as usize), + _ => Err(ArrowError::InvalidArgumentError(format!( + "Invalid integer size: {}", + size + ))), + } + } + + /// Calculate the size needed to store an integer + pub fn calculate_int_size(value: usize) -> usize { + if value <= u8::MAX as usize { + 1 + } else if value <= u16::MAX as usize { + 2 + } else if value <= 0xFFFFFF { + 3 + } else { + 4 + } + } + + /// Build object header byte + pub fn build_object_header( + is_large: bool, + field_id_size: usize, + field_offset_size: usize, + ) -> u8 { + let large_bit = if is_large { 1 } else { 0 }; + (large_bit << 6) + | (((field_id_size - 1) as u8) << 4) + | (((field_offset_size - 1) as u8) << 2) + | 2 + } + + /// Write integer bytes to buffer + pub fn write_int_bytes(buffer: &mut Vec<u8>, value: usize, size: usize) { + match size { + 1 => buffer.push(value as u8), + 2 => buffer.extend_from_slice(&(value as u16).to_le_bytes()), + 3 => { + let bytes = (value as u32).to_le_bytes(); + buffer.extend_from_slice(&bytes[..3]); + } + 4 => buffer.extend_from_slice(&(value as u32).to_le_bytes()), + _ => panic!("Invalid size: {}", size), + } + } + + + + /// Check if value bytes represent a primitive + pub fn is_primitive(value_bytes: &[u8]) -> bool { + if value_bytes.is_empty() { + return false; + } + Self::get_basic_type(value_bytes[0]) == VariantBasicType::Primitive + } + + /// Check if value bytes represent a short string + pub fn is_short_string(value_bytes: &[u8]) -> bool { + if value_bytes.is_empty() { + return false; + } + Self::get_basic_type(value_bytes[0]) == VariantBasicType::ShortString + } + + /// Check if value bytes represent an object + pub fn is_object(value_bytes: &[u8]) -> bool { + if value_bytes.is_empty() { + return false; + } + Self::get_basic_type(value_bytes[0]) == VariantBasicType::Object + } + + /// Check if value bytes represent an array + pub fn is_array(value_bytes: &[u8]) -> bool { + if value_bytes.is_empty() { + return false; + } + Self::get_basic_type(value_bytes[0]) == VariantBasicType::Array + } + + /// Get the data length for a primitive type + /// Returns Some(len) for fixed-length types, None for variable-length types + pub fn get_primitive_data_length(primitive_type: &PrimitiveType) -> Option<usize> { + match primitive_type { + PrimitiveType::Null | PrimitiveType::True | PrimitiveType::False => Some(0), + PrimitiveType::Int8 => Some(1), + PrimitiveType::Int16 => Some(2), + PrimitiveType::Int32 + | PrimitiveType::Float + | PrimitiveType::Decimal4 + | PrimitiveType::Date => Some(4), + PrimitiveType::Int64 + | PrimitiveType::Double + | PrimitiveType::Decimal8 + | PrimitiveType::TimestampNtz + | PrimitiveType::TimestampLtz => Some(8), + PrimitiveType::Decimal16 => Some(16), + PrimitiveType::Binary | PrimitiveType::String => None, // Variable length, need to read from data + } + } + + /// Extract short string data from value bytes + pub fn extract_short_string_data(value_bytes: &[u8]) -> Result<&[u8], ArrowError> { + if value_bytes.is_empty() { + return Err(ArrowError::InvalidArgumentError( + "Empty value bytes".to_string(), + )); + } + + let header = Self::parse_short_string_header(value_bytes[0])?; + + if value_bytes.len() < 1 + header.length { + return Err(ArrowError::InvalidArgumentError(format!( + "Short string data length {} exceeds available bytes", + header.length + ))); + } + + Ok(&value_bytes[1..1 + header.length]) + } + + /// Extract primitive data from value bytes + pub fn extract_primitive_data(value_bytes: &[u8]) -> Result<&[u8], ArrowError> { + if value_bytes.is_empty() { + return Err(ArrowError::InvalidArgumentError( + "Empty value bytes".to_string(), + )); + } + + let primitive_type = Self::parse_primitive_header(value_bytes[0])?; + let data_length = Self::get_primitive_data_length(&primitive_type); + + match data_length { + Some(0) => { + // Fixed-length 0-byte types (null/true/false) + Ok(&[]) + } + Some(len) => { + // Fixed-length types with len bytes + if value_bytes.len() < 1 + len { + return Err(ArrowError::InvalidArgumentError(format!( + "Fixed length primitive data length {} exceeds available bytes", + len + ))); + } + Ok(&value_bytes[1..1 + len]) + } + None => { + // Variable-length types (binary/string) - read length from data + if value_bytes.len() < 5 { + return Err(ArrowError::InvalidArgumentError( + "Not enough bytes for variable length primitive".to_string(), + )); + } + let length = u32::from_le_bytes([ + value_bytes[1], + value_bytes[2], + value_bytes[3], + value_bytes[4], + ]) as usize; + if value_bytes.len() < 5 + length { + return Err(ArrowError::InvalidArgumentError( + "Variable length primitive data exceeds available bytes".to_string(), + )); + } + Ok(&value_bytes[5..5 + length]) Review Comment: decoder.rs has utilities could make this code a lot simpler... ```suggestion let length_bytes = slice_from_slice_at_offset(value_bytes, 1, 0..4)?; let length = unpack_int(length_bytes, 4)?; slice_from_slice_at_offset(value_bytes, 5, 0..length) ``` ########## parquet-variant-compute/src/variant_array.rs: ########## @@ -158,13 +159,93 @@ impl VariantArray { /// Return a reference to the metadata field of the [`StructArray`] pub fn metadata_field(&self) -> &ArrayRef { // spec says fields order is not guaranteed, so we search by name - &self.metadata_ref + self.inner.column_by_name("metadata").unwrap() } /// Return a reference to the value field of the `StructArray` pub fn value_field(&self) -> &ArrayRef { // spec says fields order is not guaranteed, so we search by name - &self.value_ref + self.inner.column_by_name("value").unwrap() + } + + /// Get the metadata bytes for a specific index + pub fn metadata_bytes(&self, index: usize) -> &[u8] { + self.metadata_field().as_binary_view().value(index).as_ref() + } + + /// Get the value bytes for a specific index + pub fn value_bytes(&self, index: usize) -> &[u8] { + self.value_field().as_binary_view().value(index).as_ref() + } + + /// Get the field names for an object at the given index + pub fn get_field_names(&self, index: usize) -> Vec<String> { + if index >= self.len() { + return vec![]; + } + + if self.is_null(index) { + return vec![]; + } + + let variant = self.value(index); + if let Some(obj) = variant.as_object() { + let mut field_names = Vec::new(); + for i in 0..obj.len() { + if let Some(field_name) = obj.field_name(i) { + field_names.push(field_name.to_string()); + } + } + field_names Review Comment: ```suggestion Vec::from_iter((0..obj.len()).map(|i| obj.field_name(i).unwrap().to_string())); ``` ########## parquet-variant-compute/src/variant_array.rs: ########## @@ -158,13 +159,93 @@ impl VariantArray { /// Return a reference to the metadata field of the [`StructArray`] pub fn metadata_field(&self) -> &ArrayRef { // spec says fields order is not guaranteed, so we search by name - &self.metadata_ref + self.inner.column_by_name("metadata").unwrap() } /// Return a reference to the value field of the `StructArray` pub fn value_field(&self) -> &ArrayRef { // spec says fields order is not guaranteed, so we search by name - &self.value_ref + self.inner.column_by_name("value").unwrap() + } + + /// Get the metadata bytes for a specific index + pub fn metadata_bytes(&self, index: usize) -> &[u8] { + self.metadata_field().as_binary_view().value(index).as_ref() + } + + /// Get the value bytes for a specific index + pub fn value_bytes(&self, index: usize) -> &[u8] { + self.value_field().as_binary_view().value(index).as_ref() + } + + /// Get the field names for an object at the given index + pub fn get_field_names(&self, index: usize) -> Vec<String> { + if index >= self.len() { + return vec![]; + } + + if self.is_null(index) { Review Comment: ```suggestion if index >= self.len() || self.is_null(index) { ``` ########## parquet-variant-compute/src/variant_parser.rs: ########## @@ -0,0 +1,669 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Low-level binary format parsing for variant objects + +use arrow::error::ArrowError; + +/// Basic variant type enumeration for the first 2 bits of header +#[derive(Debug, Clone, PartialEq)] +pub enum VariantBasicType { + Primitive = 0, + ShortString = 1, + Object = 2, + Array = 3, +} + +/// Primitive type variants +#[derive(Debug, Clone, PartialEq)] +pub enum PrimitiveType { + Null, + True, + False, + Int8, + Int16, + Int32, + Int64, + Double, + Decimal4, + Decimal8, + Decimal16, + Date, + TimestampNtz, + TimestampLtz, + Float, + Binary, + String, +} + +/// Variant type enumeration covering all possible types +#[derive(Debug, Clone, PartialEq)] +pub enum VariantType { + Primitive(PrimitiveType), + ShortString(ShortStringHeader), + Object(ObjectHeader), + Array(ArrayHeader), +} + +/// Short string header structure +#[derive(Debug, Clone, PartialEq)] +pub struct ShortStringHeader { + pub length: usize, +} + +/// Object header structure for variant objects +#[derive(Debug, Clone, PartialEq)] +pub struct ObjectHeader { + pub num_elements_size: usize, + pub field_id_size: usize, + pub field_offset_size: usize, + pub is_large: bool, +} + +/// Array header structure for variant objects +#[derive(Debug, Clone, PartialEq)] +pub struct ArrayHeader { + pub num_elements_size: usize, + pub element_offset_size: usize, + pub is_large: bool, +} + +/// Object byte offsets structure +#[derive(Debug, Clone)] +pub struct ObjectOffsets { + pub field_ids_start: usize, + pub field_offsets_start: usize, + pub values_start: usize, +} + +/// Array byte offsets structure +#[derive(Debug, Clone)] +pub struct ArrayOffsets { + pub element_offsets_start: usize, + pub elements_start: usize, +} + +/// Low-level parser for variant binary format +pub struct VariantParser; + +impl VariantParser { + /// General dispatch function to parse any variant header + pub fn parse_variant_header(header_byte: u8) -> Result<VariantType, ArrowError> { + let basic_type = Self::get_basic_type(header_byte); + + match basic_type { + VariantBasicType::Primitive => Ok(VariantType::Primitive( + Self::parse_primitive_header(header_byte)?, + )), + VariantBasicType::ShortString => Ok(VariantType::ShortString( + Self::parse_short_string_header(header_byte)?, + )), + VariantBasicType::Object => { + Ok(VariantType::Object(Self::parse_object_header(header_byte)?)) + } + VariantBasicType::Array => { + Ok(VariantType::Array(Self::parse_array_header(header_byte)?)) + } + } + } + + /// Parse primitive type header + pub fn parse_primitive_header(header_byte: u8) -> Result<PrimitiveType, ArrowError> { + let primitive_type = header_byte >> 2; + + match primitive_type { + 0 => Ok(PrimitiveType::Null), + 1 => Ok(PrimitiveType::True), + 2 => Ok(PrimitiveType::False), + 3 => Ok(PrimitiveType::Int8), + 4 => Ok(PrimitiveType::Int16), + 5 => Ok(PrimitiveType::Int32), + 6 => Ok(PrimitiveType::Int64), + 7 => Ok(PrimitiveType::Double), + 8 => Ok(PrimitiveType::Decimal4), + 9 => Ok(PrimitiveType::Decimal8), + 10 => Ok(PrimitiveType::Decimal16), + 11 => Ok(PrimitiveType::Date), + 12 => Ok(PrimitiveType::TimestampNtz), + 13 => Ok(PrimitiveType::TimestampLtz), + 14 => Ok(PrimitiveType::Float), + 15 => Ok(PrimitiveType::Binary), + 16 => Ok(PrimitiveType::String), + _ => Err(ArrowError::InvalidArgumentError(format!( + "Invalid primitive type: {}", + primitive_type + ))), + } + } + + /// Get the basic type from header byte + pub fn get_basic_type(header_byte: u8) -> VariantBasicType { + match header_byte & 0x03 { + 0 => VariantBasicType::Primitive, + 1 => VariantBasicType::ShortString, + 2 => VariantBasicType::Object, + 3 => VariantBasicType::Array, + _ => panic!("Invalid basic type: {}", header_byte & 0x03), + } + } + + + + /// Parse short string header + pub fn parse_short_string_header(header_byte: u8) -> Result<ShortStringHeader, ArrowError> { + let length = (header_byte >> 2) as usize; + + // Short strings can be up to 64 bytes (6-bit value: 0-63) + if length > 63 { + return Err(ArrowError::InvalidArgumentError(format!( + "Short string length {} exceeds maximum of 63", + length + ))); + } Review Comment: This error is impossible, operator `>>` for an unsigned type always shifts in zeros. ########## parquet-variant-compute/src/field_operations.rs: ########## @@ -0,0 +1,492 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Field extraction and removal operations for variant objects + +use crate::variant_parser::{ObjectHeader, ObjectOffsets, VariantParser}; +use arrow::error::ArrowError; +use parquet_variant::{VariantMetadata, VariantPath, VariantPathElement}; +use std::collections::HashSet; + +/// Field operations for variant objects +pub struct FieldOperations; + +impl FieldOperations { + /// Extract field bytes from a single variant object + pub fn extract_field_bytes( + metadata_bytes: &[u8], + value_bytes: &[u8], + field_name: &str, + ) -> Result<Option<Vec<u8>>, ArrowError> { + if !VariantParser::is_object(value_bytes) { + return Ok(None); + } + + let header_byte = value_bytes[0]; + let header = VariantParser::parse_object_header(header_byte)?; + let num_elements = VariantParser::unpack_int(&value_bytes[1..], header.num_elements_size)?; + let offsets = VariantParser::calculate_object_offsets(&header, num_elements); + + // Find field ID for the target field name + let target_field_id = Self::find_field_id(metadata_bytes, field_name)?; + let target_field_id = match target_field_id { + Some(id) => id, + None => return Ok(None), // Field not found + }; Review Comment: nit: ```suggestion let Some(target_field_id) = match target_field_id else { return Ok(None); // Field not found }; ``` ########## parquet-variant-compute/src/variant_parser.rs: ########## @@ -0,0 +1,669 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Low-level binary format parsing for variant objects + +use arrow::error::ArrowError; + +/// Basic variant type enumeration for the first 2 bits of header +#[derive(Debug, Clone, PartialEq)] +pub enum VariantBasicType { + Primitive = 0, + ShortString = 1, + Object = 2, + Array = 3, +} + +/// Primitive type variants +#[derive(Debug, Clone, PartialEq)] +pub enum PrimitiveType { + Null, + True, + False, + Int8, + Int16, + Int32, + Int64, + Double, + Decimal4, + Decimal8, + Decimal16, + Date, + TimestampNtz, + TimestampLtz, + Float, + Binary, + String, +} + +/// Variant type enumeration covering all possible types +#[derive(Debug, Clone, PartialEq)] +pub enum VariantType { + Primitive(PrimitiveType), + ShortString(ShortStringHeader), + Object(ObjectHeader), + Array(ArrayHeader), +} + +/// Short string header structure +#[derive(Debug, Clone, PartialEq)] +pub struct ShortStringHeader { + pub length: usize, +} + +/// Object header structure for variant objects +#[derive(Debug, Clone, PartialEq)] +pub struct ObjectHeader { + pub num_elements_size: usize, + pub field_id_size: usize, + pub field_offset_size: usize, + pub is_large: bool, +} + +/// Array header structure for variant objects +#[derive(Debug, Clone, PartialEq)] +pub struct ArrayHeader { + pub num_elements_size: usize, + pub element_offset_size: usize, + pub is_large: bool, +} + +/// Object byte offsets structure +#[derive(Debug, Clone)] +pub struct ObjectOffsets { + pub field_ids_start: usize, + pub field_offsets_start: usize, + pub values_start: usize, +} + +/// Array byte offsets structure +#[derive(Debug, Clone)] +pub struct ArrayOffsets { + pub element_offsets_start: usize, + pub elements_start: usize, +} + +/// Low-level parser for variant binary format +pub struct VariantParser; + +impl VariantParser { + /// General dispatch function to parse any variant header + pub fn parse_variant_header(header_byte: u8) -> Result<VariantType, ArrowError> { + let basic_type = Self::get_basic_type(header_byte); + + match basic_type { + VariantBasicType::Primitive => Ok(VariantType::Primitive( + Self::parse_primitive_header(header_byte)?, + )), + VariantBasicType::ShortString => Ok(VariantType::ShortString( + Self::parse_short_string_header(header_byte)?, + )), + VariantBasicType::Object => { + Ok(VariantType::Object(Self::parse_object_header(header_byte)?)) + } + VariantBasicType::Array => { + Ok(VariantType::Array(Self::parse_array_header(header_byte)?)) + } + } + } + + /// Parse primitive type header + pub fn parse_primitive_header(header_byte: u8) -> Result<PrimitiveType, ArrowError> { + let primitive_type = header_byte >> 2; + + match primitive_type { + 0 => Ok(PrimitiveType::Null), + 1 => Ok(PrimitiveType::True), + 2 => Ok(PrimitiveType::False), + 3 => Ok(PrimitiveType::Int8), + 4 => Ok(PrimitiveType::Int16), + 5 => Ok(PrimitiveType::Int32), + 6 => Ok(PrimitiveType::Int64), + 7 => Ok(PrimitiveType::Double), + 8 => Ok(PrimitiveType::Decimal4), + 9 => Ok(PrimitiveType::Decimal8), + 10 => Ok(PrimitiveType::Decimal16), + 11 => Ok(PrimitiveType::Date), + 12 => Ok(PrimitiveType::TimestampNtz), + 13 => Ok(PrimitiveType::TimestampLtz), + 14 => Ok(PrimitiveType::Float), + 15 => Ok(PrimitiveType::Binary), + 16 => Ok(PrimitiveType::String), + _ => Err(ArrowError::InvalidArgumentError(format!( + "Invalid primitive type: {}", + primitive_type + ))), + } + } + + /// Get the basic type from header byte + pub fn get_basic_type(header_byte: u8) -> VariantBasicType { + match header_byte & 0x03 { + 0 => VariantBasicType::Primitive, + 1 => VariantBasicType::ShortString, + 2 => VariantBasicType::Object, + 3 => VariantBasicType::Array, + _ => panic!("Invalid basic type: {}", header_byte & 0x03), + } + } + + + + /// Parse short string header + pub fn parse_short_string_header(header_byte: u8) -> Result<ShortStringHeader, ArrowError> { + let length = (header_byte >> 2) as usize; + + // Short strings can be up to 64 bytes (6-bit value: 0-63) + if length > 63 { + return Err(ArrowError::InvalidArgumentError(format!( + "Short string length {} exceeds maximum of 63", + length + ))); + } + + Ok(ShortStringHeader { length }) + } + + /// Parse object header from header byte + pub fn parse_object_header(header_byte: u8) -> Result<ObjectHeader, ArrowError> { + let value_header = header_byte >> 2; + let field_offset_size_minus_one = value_header & 0x03; + let field_id_size_minus_one = (value_header >> 2) & 0x03; + let is_large = (value_header & 0x10) != 0; + + let num_elements_size = if is_large { 4 } else { 1 }; + let field_id_size = (field_id_size_minus_one + 1) as usize; + let field_offset_size = (field_offset_size_minus_one + 1) as usize; + + Ok(ObjectHeader { + num_elements_size, + field_id_size, + field_offset_size, + is_large, + }) + } + + /// Parse array header from header byte + pub fn parse_array_header(header_byte: u8) -> Result<ArrayHeader, ArrowError> { + let value_header = header_byte >> 2; + let element_offset_size_minus_one = value_header & 0x03; + let is_large = (value_header & 0x10) != 0; + + let num_elements_size = if is_large { 4 } else { 1 }; + let element_offset_size = (element_offset_size_minus_one + 1) as usize; + + Ok(ArrayHeader { + num_elements_size, + element_offset_size, + is_large, + }) + } + + /// Unpack integer from bytes + pub fn unpack_int(bytes: &[u8], size: usize) -> Result<usize, ArrowError> { + if bytes.len() < size { + return Err(ArrowError::InvalidArgumentError( + "Not enough bytes to unpack integer".to_string(), + )); + } + + match size { + 1 => Ok(bytes[0] as usize), + 2 => Ok(u16::from_le_bytes([bytes[0], bytes[1]]) as usize), + 3 => Ok(u32::from_le_bytes([bytes[0], bytes[1], bytes[2], 0]) as usize), + 4 => Ok(u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as usize), + _ => Err(ArrowError::InvalidArgumentError(format!( + "Invalid integer size: {}", + size + ))), + } + } + + /// Calculate the size needed to store an integer + pub fn calculate_int_size(value: usize) -> usize { + if value <= u8::MAX as usize { + 1 + } else if value <= u16::MAX as usize { + 2 + } else if value <= 0xFFFFFF { + 3 + } else { + 4 + } + } + + /// Build object header byte + pub fn build_object_header( + is_large: bool, + field_id_size: usize, + field_offset_size: usize, + ) -> u8 { + let large_bit = if is_large { 1 } else { 0 }; + (large_bit << 6) + | (((field_id_size - 1) as u8) << 4) + | (((field_offset_size - 1) as u8) << 2) + | 2 + } + + /// Write integer bytes to buffer + pub fn write_int_bytes(buffer: &mut Vec<u8>, value: usize, size: usize) { + match size { + 1 => buffer.push(value as u8), + 2 => buffer.extend_from_slice(&(value as u16).to_le_bytes()), + 3 => { + let bytes = (value as u32).to_le_bytes(); + buffer.extend_from_slice(&bytes[..3]); + } + 4 => buffer.extend_from_slice(&(value as u32).to_le_bytes()), + _ => panic!("Invalid size: {}", size), + } + } + + + + /// Check if value bytes represent a primitive + pub fn is_primitive(value_bytes: &[u8]) -> bool { + if value_bytes.is_empty() { + return false; + } + Self::get_basic_type(value_bytes[0]) == VariantBasicType::Primitive + } + + /// Check if value bytes represent a short string + pub fn is_short_string(value_bytes: &[u8]) -> bool { + if value_bytes.is_empty() { + return false; + } + Self::get_basic_type(value_bytes[0]) == VariantBasicType::ShortString + } + + /// Check if value bytes represent an object + pub fn is_object(value_bytes: &[u8]) -> bool { + if value_bytes.is_empty() { + return false; + } + Self::get_basic_type(value_bytes[0]) == VariantBasicType::Object + } + + /// Check if value bytes represent an array + pub fn is_array(value_bytes: &[u8]) -> bool { + if value_bytes.is_empty() { + return false; + } + Self::get_basic_type(value_bytes[0]) == VariantBasicType::Array + } + + /// Get the data length for a primitive type + /// Returns Some(len) for fixed-length types, None for variable-length types + pub fn get_primitive_data_length(primitive_type: &PrimitiveType) -> Option<usize> { + match primitive_type { + PrimitiveType::Null | PrimitiveType::True | PrimitiveType::False => Some(0), + PrimitiveType::Int8 => Some(1), + PrimitiveType::Int16 => Some(2), + PrimitiveType::Int32 + | PrimitiveType::Float + | PrimitiveType::Decimal4 + | PrimitiveType::Date => Some(4), Review Comment: might be worth a `use`? ```suggestion use PrimitiveType::*; match primitive_type { Null | True | False => Some(0), Int8 => Some(1), Int16 => Some(2), Int32 | Float | Decimal4 | Date => Some(4), ``` ########## parquet-variant-compute/src/variant_parser.rs: ########## @@ -0,0 +1,669 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Low-level binary format parsing for variant objects + +use arrow::error::ArrowError; + +/// Basic variant type enumeration for the first 2 bits of header +#[derive(Debug, Clone, PartialEq)] +pub enum VariantBasicType { + Primitive = 0, + ShortString = 1, + Object = 2, + Array = 3, +} + +/// Primitive type variants +#[derive(Debug, Clone, PartialEq)] +pub enum PrimitiveType { + Null, + True, + False, + Int8, + Int16, + Int32, + Int64, + Double, + Decimal4, + Decimal8, + Decimal16, + Date, + TimestampNtz, + TimestampLtz, + Float, + Binary, + String, +} + +/// Variant type enumeration covering all possible types +#[derive(Debug, Clone, PartialEq)] +pub enum VariantType { + Primitive(PrimitiveType), + ShortString(ShortStringHeader), + Object(ObjectHeader), + Array(ArrayHeader), +} + +/// Short string header structure +#[derive(Debug, Clone, PartialEq)] +pub struct ShortStringHeader { + pub length: usize, +} + +/// Object header structure for variant objects +#[derive(Debug, Clone, PartialEq)] +pub struct ObjectHeader { + pub num_elements_size: usize, + pub field_id_size: usize, + pub field_offset_size: usize, + pub is_large: bool, +} + +/// Array header structure for variant objects +#[derive(Debug, Clone, PartialEq)] +pub struct ArrayHeader { + pub num_elements_size: usize, + pub element_offset_size: usize, + pub is_large: bool, +} + +/// Object byte offsets structure +#[derive(Debug, Clone)] +pub struct ObjectOffsets { + pub field_ids_start: usize, + pub field_offsets_start: usize, + pub values_start: usize, +} + +/// Array byte offsets structure +#[derive(Debug, Clone)] +pub struct ArrayOffsets { + pub element_offsets_start: usize, + pub elements_start: usize, +} + +/// Low-level parser for variant binary format +pub struct VariantParser; + +impl VariantParser { + /// General dispatch function to parse any variant header + pub fn parse_variant_header(header_byte: u8) -> Result<VariantType, ArrowError> { + let basic_type = Self::get_basic_type(header_byte); + + match basic_type { + VariantBasicType::Primitive => Ok(VariantType::Primitive( + Self::parse_primitive_header(header_byte)?, + )), + VariantBasicType::ShortString => Ok(VariantType::ShortString( + Self::parse_short_string_header(header_byte)?, + )), + VariantBasicType::Object => { + Ok(VariantType::Object(Self::parse_object_header(header_byte)?)) + } + VariantBasicType::Array => { + Ok(VariantType::Array(Self::parse_array_header(header_byte)?)) + } + } + } + + /// Parse primitive type header + pub fn parse_primitive_header(header_byte: u8) -> Result<PrimitiveType, ArrowError> { + let primitive_type = header_byte >> 2; + + match primitive_type { + 0 => Ok(PrimitiveType::Null), + 1 => Ok(PrimitiveType::True), + 2 => Ok(PrimitiveType::False), + 3 => Ok(PrimitiveType::Int8), + 4 => Ok(PrimitiveType::Int16), + 5 => Ok(PrimitiveType::Int32), + 6 => Ok(PrimitiveType::Int64), + 7 => Ok(PrimitiveType::Double), + 8 => Ok(PrimitiveType::Decimal4), + 9 => Ok(PrimitiveType::Decimal8), + 10 => Ok(PrimitiveType::Decimal16), + 11 => Ok(PrimitiveType::Date), + 12 => Ok(PrimitiveType::TimestampNtz), + 13 => Ok(PrimitiveType::TimestampLtz), + 14 => Ok(PrimitiveType::Float), + 15 => Ok(PrimitiveType::Binary), + 16 => Ok(PrimitiveType::String), + _ => Err(ArrowError::InvalidArgumentError(format!( + "Invalid primitive type: {}", + primitive_type + ))), + } + } + + /// Get the basic type from header byte + pub fn get_basic_type(header_byte: u8) -> VariantBasicType { + match header_byte & 0x03 { + 0 => VariantBasicType::Primitive, + 1 => VariantBasicType::ShortString, + 2 => VariantBasicType::Object, + 3 => VariantBasicType::Array, + _ => panic!("Invalid basic type: {}", header_byte & 0x03), + } + } + + + + /// Parse short string header + pub fn parse_short_string_header(header_byte: u8) -> Result<ShortStringHeader, ArrowError> { + let length = (header_byte >> 2) as usize; + + // Short strings can be up to 64 bytes (6-bit value: 0-63) + if length > 63 { + return Err(ArrowError::InvalidArgumentError(format!( + "Short string length {} exceeds maximum of 63", + length + ))); + } + + Ok(ShortStringHeader { length }) + } + + /// Parse object header from header byte + pub fn parse_object_header(header_byte: u8) -> Result<ObjectHeader, ArrowError> { + let value_header = header_byte >> 2; + let field_offset_size_minus_one = value_header & 0x03; + let field_id_size_minus_one = (value_header >> 2) & 0x03; + let is_large = (value_header & 0x10) != 0; + + let num_elements_size = if is_large { 4 } else { 1 }; + let field_id_size = (field_id_size_minus_one + 1) as usize; + let field_offset_size = (field_offset_size_minus_one + 1) as usize; + + Ok(ObjectHeader { + num_elements_size, + field_id_size, + field_offset_size, + is_large, + }) + } + + /// Parse array header from header byte + pub fn parse_array_header(header_byte: u8) -> Result<ArrayHeader, ArrowError> { + let value_header = header_byte >> 2; + let element_offset_size_minus_one = value_header & 0x03; + let is_large = (value_header & 0x10) != 0; + + let num_elements_size = if is_large { 4 } else { 1 }; + let element_offset_size = (element_offset_size_minus_one + 1) as usize; + + Ok(ArrayHeader { + num_elements_size, + element_offset_size, + is_large, + }) + } + + /// Unpack integer from bytes + pub fn unpack_int(bytes: &[u8], size: usize) -> Result<usize, ArrowError> { + if bytes.len() < size { + return Err(ArrowError::InvalidArgumentError( + "Not enough bytes to unpack integer".to_string(), + )); + } + + match size { + 1 => Ok(bytes[0] as usize), + 2 => Ok(u16::from_le_bytes([bytes[0], bytes[1]]) as usize), + 3 => Ok(u32::from_le_bytes([bytes[0], bytes[1], bytes[2], 0]) as usize), + 4 => Ok(u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as usize), + _ => Err(ArrowError::InvalidArgumentError(format!( + "Invalid integer size: {}", + size + ))), + } + } + + /// Calculate the size needed to store an integer + pub fn calculate_int_size(value: usize) -> usize { + if value <= u8::MAX as usize { + 1 + } else if value <= u16::MAX as usize { + 2 + } else if value <= 0xFFFFFF { + 3 + } else { + 4 + } + } + + /// Build object header byte + pub fn build_object_header( + is_large: bool, + field_id_size: usize, + field_offset_size: usize, + ) -> u8 { + let large_bit = if is_large { 1 } else { 0 }; + (large_bit << 6) + | (((field_id_size - 1) as u8) << 4) + | (((field_offset_size - 1) as u8) << 2) + | 2 + } + + /// Write integer bytes to buffer + pub fn write_int_bytes(buffer: &mut Vec<u8>, value: usize, size: usize) { + match size { + 1 => buffer.push(value as u8), + 2 => buffer.extend_from_slice(&(value as u16).to_le_bytes()), + 3 => { + let bytes = (value as u32).to_le_bytes(); + buffer.extend_from_slice(&bytes[..3]); + } + 4 => buffer.extend_from_slice(&(value as u32).to_le_bytes()), + _ => panic!("Invalid size: {}", size), + } + } + + + + /// Check if value bytes represent a primitive + pub fn is_primitive(value_bytes: &[u8]) -> bool { + if value_bytes.is_empty() { + return false; + } + Self::get_basic_type(value_bytes[0]) == VariantBasicType::Primitive + } + + /// Check if value bytes represent a short string + pub fn is_short_string(value_bytes: &[u8]) -> bool { + if value_bytes.is_empty() { + return false; + } + Self::get_basic_type(value_bytes[0]) == VariantBasicType::ShortString Review Comment: nit: ```suggestion value_bytes.get(0).is_some_and(|header| { Self::get_basic_type(header) == VariantBasicType::ShortString }) ``` (more below, if you like this idea) ########## parquet-variant-compute/src/variant_parser.rs: ########## @@ -0,0 +1,669 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Low-level binary format parsing for variant objects + +use arrow::error::ArrowError; + +/// Basic variant type enumeration for the first 2 bits of header +#[derive(Debug, Clone, PartialEq)] +pub enum VariantBasicType { + Primitive = 0, + ShortString = 1, + Object = 2, + Array = 3, +} + +/// Primitive type variants +#[derive(Debug, Clone, PartialEq)] +pub enum PrimitiveType { + Null, + True, + False, + Int8, + Int16, + Int32, + Int64, + Double, + Decimal4, + Decimal8, + Decimal16, + Date, + TimestampNtz, + TimestampLtz, + Float, + Binary, + String, +} + +/// Variant type enumeration covering all possible types +#[derive(Debug, Clone, PartialEq)] +pub enum VariantType { + Primitive(PrimitiveType), + ShortString(ShortStringHeader), + Object(ObjectHeader), + Array(ArrayHeader), +} + +/// Short string header structure +#[derive(Debug, Clone, PartialEq)] +pub struct ShortStringHeader { + pub length: usize, +} + +/// Object header structure for variant objects +#[derive(Debug, Clone, PartialEq)] +pub struct ObjectHeader { + pub num_elements_size: usize, + pub field_id_size: usize, + pub field_offset_size: usize, + pub is_large: bool, +} + +/// Array header structure for variant objects +#[derive(Debug, Clone, PartialEq)] +pub struct ArrayHeader { + pub num_elements_size: usize, + pub element_offset_size: usize, + pub is_large: bool, +} + +/// Object byte offsets structure +#[derive(Debug, Clone)] +pub struct ObjectOffsets { + pub field_ids_start: usize, + pub field_offsets_start: usize, + pub values_start: usize, +} + +/// Array byte offsets structure +#[derive(Debug, Clone)] +pub struct ArrayOffsets { + pub element_offsets_start: usize, + pub elements_start: usize, +} + +/// Low-level parser for variant binary format +pub struct VariantParser; + +impl VariantParser { + /// General dispatch function to parse any variant header + pub fn parse_variant_header(header_byte: u8) -> Result<VariantType, ArrowError> { + let basic_type = Self::get_basic_type(header_byte); + + match basic_type { + VariantBasicType::Primitive => Ok(VariantType::Primitive( + Self::parse_primitive_header(header_byte)?, + )), + VariantBasicType::ShortString => Ok(VariantType::ShortString( + Self::parse_short_string_header(header_byte)?, + )), + VariantBasicType::Object => { + Ok(VariantType::Object(Self::parse_object_header(header_byte)?)) + } + VariantBasicType::Array => { + Ok(VariantType::Array(Self::parse_array_header(header_byte)?)) + } + } + } + + /// Parse primitive type header + pub fn parse_primitive_header(header_byte: u8) -> Result<PrimitiveType, ArrowError> { + let primitive_type = header_byte >> 2; + + match primitive_type { + 0 => Ok(PrimitiveType::Null), + 1 => Ok(PrimitiveType::True), + 2 => Ok(PrimitiveType::False), + 3 => Ok(PrimitiveType::Int8), + 4 => Ok(PrimitiveType::Int16), + 5 => Ok(PrimitiveType::Int32), + 6 => Ok(PrimitiveType::Int64), + 7 => Ok(PrimitiveType::Double), + 8 => Ok(PrimitiveType::Decimal4), + 9 => Ok(PrimitiveType::Decimal8), + 10 => Ok(PrimitiveType::Decimal16), + 11 => Ok(PrimitiveType::Date), + 12 => Ok(PrimitiveType::TimestampNtz), + 13 => Ok(PrimitiveType::TimestampLtz), + 14 => Ok(PrimitiveType::Float), + 15 => Ok(PrimitiveType::Binary), + 16 => Ok(PrimitiveType::String), + _ => Err(ArrowError::InvalidArgumentError(format!( + "Invalid primitive type: {}", + primitive_type + ))), + } + } + + /// Get the basic type from header byte + pub fn get_basic_type(header_byte: u8) -> VariantBasicType { + match header_byte & 0x03 { + 0 => VariantBasicType::Primitive, + 1 => VariantBasicType::ShortString, + 2 => VariantBasicType::Object, + 3 => VariantBasicType::Array, + _ => panic!("Invalid basic type: {}", header_byte & 0x03), + } + } + + + + /// Parse short string header + pub fn parse_short_string_header(header_byte: u8) -> Result<ShortStringHeader, ArrowError> { + let length = (header_byte >> 2) as usize; + + // Short strings can be up to 64 bytes (6-bit value: 0-63) + if length > 63 { + return Err(ArrowError::InvalidArgumentError(format!( + "Short string length {} exceeds maximum of 63", + length + ))); + } + + Ok(ShortStringHeader { length }) + } + + /// Parse object header from header byte + pub fn parse_object_header(header_byte: u8) -> Result<ObjectHeader, ArrowError> { + let value_header = header_byte >> 2; + let field_offset_size_minus_one = value_header & 0x03; + let field_id_size_minus_one = (value_header >> 2) & 0x03; + let is_large = (value_header & 0x10) != 0; + + let num_elements_size = if is_large { 4 } else { 1 }; + let field_id_size = (field_id_size_minus_one + 1) as usize; + let field_offset_size = (field_offset_size_minus_one + 1) as usize; + + Ok(ObjectHeader { + num_elements_size, + field_id_size, + field_offset_size, + is_large, + }) + } + + /// Parse array header from header byte + pub fn parse_array_header(header_byte: u8) -> Result<ArrayHeader, ArrowError> { + let value_header = header_byte >> 2; + let element_offset_size_minus_one = value_header & 0x03; + let is_large = (value_header & 0x10) != 0; + + let num_elements_size = if is_large { 4 } else { 1 }; + let element_offset_size = (element_offset_size_minus_one + 1) as usize; + + Ok(ArrayHeader { + num_elements_size, + element_offset_size, + is_large, + }) + } + + /// Unpack integer from bytes + pub fn unpack_int(bytes: &[u8], size: usize) -> Result<usize, ArrowError> { + if bytes.len() < size { + return Err(ArrowError::InvalidArgumentError( + "Not enough bytes to unpack integer".to_string(), + )); + } + + match size { + 1 => Ok(bytes[0] as usize), + 2 => Ok(u16::from_le_bytes([bytes[0], bytes[1]]) as usize), + 3 => Ok(u32::from_le_bytes([bytes[0], bytes[1], bytes[2], 0]) as usize), + 4 => Ok(u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as usize), + _ => Err(ArrowError::InvalidArgumentError(format!( + "Invalid integer size: {}", + size + ))), + } + } + + /// Calculate the size needed to store an integer + pub fn calculate_int_size(value: usize) -> usize { + if value <= u8::MAX as usize { + 1 + } else if value <= u16::MAX as usize { + 2 + } else if value <= 0xFFFFFF { + 3 + } else { + 4 + } + } + + /// Build object header byte + pub fn build_object_header( + is_large: bool, + field_id_size: usize, + field_offset_size: usize, + ) -> u8 { + let large_bit = if is_large { 1 } else { 0 }; + (large_bit << 6) + | (((field_id_size - 1) as u8) << 4) + | (((field_offset_size - 1) as u8) << 2) + | 2 + } + + /// Write integer bytes to buffer + pub fn write_int_bytes(buffer: &mut Vec<u8>, value: usize, size: usize) { + match size { + 1 => buffer.push(value as u8), + 2 => buffer.extend_from_slice(&(value as u16).to_le_bytes()), + 3 => { + let bytes = (value as u32).to_le_bytes(); + buffer.extend_from_slice(&bytes[..3]); + } + 4 => buffer.extend_from_slice(&(value as u32).to_le_bytes()), + _ => panic!("Invalid size: {}", size), + } + } + + + + /// Check if value bytes represent a primitive + pub fn is_primitive(value_bytes: &[u8]) -> bool { + if value_bytes.is_empty() { + return false; + } + Self::get_basic_type(value_bytes[0]) == VariantBasicType::Primitive + } + + /// Check if value bytes represent a short string + pub fn is_short_string(value_bytes: &[u8]) -> bool { + if value_bytes.is_empty() { + return false; + } + Self::get_basic_type(value_bytes[0]) == VariantBasicType::ShortString + } + + /// Check if value bytes represent an object + pub fn is_object(value_bytes: &[u8]) -> bool { + if value_bytes.is_empty() { + return false; + } + Self::get_basic_type(value_bytes[0]) == VariantBasicType::Object + } + + /// Check if value bytes represent an array + pub fn is_array(value_bytes: &[u8]) -> bool { + if value_bytes.is_empty() { + return false; + } + Self::get_basic_type(value_bytes[0]) == VariantBasicType::Array + } + + /// Get the data length for a primitive type + /// Returns Some(len) for fixed-length types, None for variable-length types + pub fn get_primitive_data_length(primitive_type: &PrimitiveType) -> Option<usize> { + match primitive_type { + PrimitiveType::Null | PrimitiveType::True | PrimitiveType::False => Some(0), + PrimitiveType::Int8 => Some(1), + PrimitiveType::Int16 => Some(2), + PrimitiveType::Int32 + | PrimitiveType::Float + | PrimitiveType::Decimal4 + | PrimitiveType::Date => Some(4), + PrimitiveType::Int64 + | PrimitiveType::Double + | PrimitiveType::Decimal8 + | PrimitiveType::TimestampNtz + | PrimitiveType::TimestampLtz => Some(8), + PrimitiveType::Decimal16 => Some(16), + PrimitiveType::Binary | PrimitiveType::String => None, // Variable length, need to read from data + } + } + + /// Extract short string data from value bytes + pub fn extract_short_string_data(value_bytes: &[u8]) -> Result<&[u8], ArrowError> { + if value_bytes.is_empty() { + return Err(ArrowError::InvalidArgumentError( + "Empty value bytes".to_string(), + )); + } + + let header = Self::parse_short_string_header(value_bytes[0])?; + + if value_bytes.len() < 1 + header.length { + return Err(ArrowError::InvalidArgumentError(format!( + "Short string data length {} exceeds available bytes", + header.length + ))); + } + + Ok(&value_bytes[1..1 + header.length]) + } + + /// Extract primitive data from value bytes + pub fn extract_primitive_data(value_bytes: &[u8]) -> Result<&[u8], ArrowError> { + if value_bytes.is_empty() { + return Err(ArrowError::InvalidArgumentError( + "Empty value bytes".to_string(), + )); + } + + let primitive_type = Self::parse_primitive_header(value_bytes[0])?; + let data_length = Self::get_primitive_data_length(&primitive_type); + + match data_length { + Some(0) => { + // Fixed-length 0-byte types (null/true/false) + Ok(&[]) + } + Some(len) => { + // Fixed-length types with len bytes + if value_bytes.len() < 1 + len { + return Err(ArrowError::InvalidArgumentError(format!( + "Fixed length primitive data length {} exceeds available bytes", + len + ))); + } + Ok(&value_bytes[1..1 + len]) + } Review Comment: ```suggestion Some(len) => slice_from_slice_at_offset(1, 0..len), ``` ########## parquet-variant-compute/src/field_operations.rs: ########## @@ -0,0 +1,492 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Field extraction and removal operations for variant objects + +use crate::variant_parser::{ObjectHeader, ObjectOffsets, VariantParser}; +use arrow::error::ArrowError; +use parquet_variant::{VariantMetadata, VariantPath, VariantPathElement}; +use std::collections::HashSet; + +/// Field operations for variant objects +pub struct FieldOperations; + +impl FieldOperations { + /// Extract field bytes from a single variant object + pub fn extract_field_bytes( + metadata_bytes: &[u8], + value_bytes: &[u8], + field_name: &str, + ) -> Result<Option<Vec<u8>>, ArrowError> { + if !VariantParser::is_object(value_bytes) { + return Ok(None); + } + + let header_byte = value_bytes[0]; + let header = VariantParser::parse_object_header(header_byte)?; + let num_elements = VariantParser::unpack_int(&value_bytes[1..], header.num_elements_size)?; + let offsets = VariantParser::calculate_object_offsets(&header, num_elements); + + // Find field ID for the target field name + let target_field_id = Self::find_field_id(metadata_bytes, field_name)?; + let target_field_id = match target_field_id { + Some(id) => id, + None => return Ok(None), // Field not found + }; + + // Search for the field in the object + for i in 0..num_elements { Review Comment: This is both incorrect (non-ordered metadata is allowed to have duplicates) and inefficient (binary search for the field on large objects). Binary search could be future work but we need to handle duplicates correctly. The variant-parquet crate provides binary searching utility code to make that part easier. If this were a `VariantObject`, the field lookup is just a method call, and already chooses between linear vs. binary search automatically. ########## parquet-variant-compute/src/variant_parser.rs: ########## @@ -0,0 +1,669 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Low-level binary format parsing for variant objects + +use arrow::error::ArrowError; + +/// Basic variant type enumeration for the first 2 bits of header +#[derive(Debug, Clone, PartialEq)] +pub enum VariantBasicType { + Primitive = 0, + ShortString = 1, + Object = 2, + Array = 3, +} + +/// Primitive type variants +#[derive(Debug, Clone, PartialEq)] +pub enum PrimitiveType { + Null, + True, + False, + Int8, + Int16, + Int32, + Int64, + Double, + Decimal4, + Decimal8, + Decimal16, + Date, + TimestampNtz, + TimestampLtz, + Float, + Binary, + String, +} + +/// Variant type enumeration covering all possible types +#[derive(Debug, Clone, PartialEq)] +pub enum VariantType { + Primitive(PrimitiveType), + ShortString(ShortStringHeader), + Object(ObjectHeader), + Array(ArrayHeader), +} + +/// Short string header structure +#[derive(Debug, Clone, PartialEq)] +pub struct ShortStringHeader { + pub length: usize, +} + +/// Object header structure for variant objects +#[derive(Debug, Clone, PartialEq)] +pub struct ObjectHeader { + pub num_elements_size: usize, + pub field_id_size: usize, + pub field_offset_size: usize, + pub is_large: bool, +} + +/// Array header structure for variant objects +#[derive(Debug, Clone, PartialEq)] +pub struct ArrayHeader { + pub num_elements_size: usize, + pub element_offset_size: usize, + pub is_large: bool, +} + +/// Object byte offsets structure +#[derive(Debug, Clone)] +pub struct ObjectOffsets { + pub field_ids_start: usize, + pub field_offsets_start: usize, + pub values_start: usize, +} + +/// Array byte offsets structure +#[derive(Debug, Clone)] +pub struct ArrayOffsets { + pub element_offsets_start: usize, + pub elements_start: usize, +} + +/// Low-level parser for variant binary format +pub struct VariantParser; + +impl VariantParser { + /// General dispatch function to parse any variant header + pub fn parse_variant_header(header_byte: u8) -> Result<VariantType, ArrowError> { + let basic_type = Self::get_basic_type(header_byte); + + match basic_type { + VariantBasicType::Primitive => Ok(VariantType::Primitive( + Self::parse_primitive_header(header_byte)?, + )), + VariantBasicType::ShortString => Ok(VariantType::ShortString( + Self::parse_short_string_header(header_byte)?, + )), + VariantBasicType::Object => { + Ok(VariantType::Object(Self::parse_object_header(header_byte)?)) + } + VariantBasicType::Array => { + Ok(VariantType::Array(Self::parse_array_header(header_byte)?)) + } + } + } + + /// Parse primitive type header + pub fn parse_primitive_header(header_byte: u8) -> Result<PrimitiveType, ArrowError> { + let primitive_type = header_byte >> 2; + + match primitive_type { + 0 => Ok(PrimitiveType::Null), + 1 => Ok(PrimitiveType::True), + 2 => Ok(PrimitiveType::False), + 3 => Ok(PrimitiveType::Int8), + 4 => Ok(PrimitiveType::Int16), + 5 => Ok(PrimitiveType::Int32), + 6 => Ok(PrimitiveType::Int64), + 7 => Ok(PrimitiveType::Double), + 8 => Ok(PrimitiveType::Decimal4), + 9 => Ok(PrimitiveType::Decimal8), + 10 => Ok(PrimitiveType::Decimal16), + 11 => Ok(PrimitiveType::Date), + 12 => Ok(PrimitiveType::TimestampNtz), + 13 => Ok(PrimitiveType::TimestampLtz), + 14 => Ok(PrimitiveType::Float), + 15 => Ok(PrimitiveType::Binary), + 16 => Ok(PrimitiveType::String), + _ => Err(ArrowError::InvalidArgumentError(format!( + "Invalid primitive type: {}", + primitive_type + ))), + } + } + + /// Get the basic type from header byte + pub fn get_basic_type(header_byte: u8) -> VariantBasicType { + match header_byte & 0x03 { + 0 => VariantBasicType::Primitive, + 1 => VariantBasicType::ShortString, + 2 => VariantBasicType::Object, + 3 => VariantBasicType::Array, + _ => panic!("Invalid basic type: {}", header_byte & 0x03), + } + } + + + + /// Parse short string header + pub fn parse_short_string_header(header_byte: u8) -> Result<ShortStringHeader, ArrowError> { + let length = (header_byte >> 2) as usize; + + // Short strings can be up to 64 bytes (6-bit value: 0-63) + if length > 63 { + return Err(ArrowError::InvalidArgumentError(format!( + "Short string length {} exceeds maximum of 63", + length + ))); + } + + Ok(ShortStringHeader { length }) + } + + /// Parse object header from header byte + pub fn parse_object_header(header_byte: u8) -> Result<ObjectHeader, ArrowError> { + let value_header = header_byte >> 2; + let field_offset_size_minus_one = value_header & 0x03; + let field_id_size_minus_one = (value_header >> 2) & 0x03; + let is_large = (value_header & 0x10) != 0; + + let num_elements_size = if is_large { 4 } else { 1 }; + let field_id_size = (field_id_size_minus_one + 1) as usize; + let field_offset_size = (field_offset_size_minus_one + 1) as usize; + + Ok(ObjectHeader { + num_elements_size, + field_id_size, + field_offset_size, + is_large, + }) + } + + /// Parse array header from header byte + pub fn parse_array_header(header_byte: u8) -> Result<ArrayHeader, ArrowError> { + let value_header = header_byte >> 2; + let element_offset_size_minus_one = value_header & 0x03; + let is_large = (value_header & 0x10) != 0; + + let num_elements_size = if is_large { 4 } else { 1 }; + let element_offset_size = (element_offset_size_minus_one + 1) as usize; + + Ok(ArrayHeader { + num_elements_size, + element_offset_size, + is_large, + }) + } + + /// Unpack integer from bytes + pub fn unpack_int(bytes: &[u8], size: usize) -> Result<usize, ArrowError> { + if bytes.len() < size { + return Err(ArrowError::InvalidArgumentError( + "Not enough bytes to unpack integer".to_string(), + )); + } + + match size { + 1 => Ok(bytes[0] as usize), + 2 => Ok(u16::from_le_bytes([bytes[0], bytes[1]]) as usize), + 3 => Ok(u32::from_le_bytes([bytes[0], bytes[1], bytes[2], 0]) as usize), + 4 => Ok(u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as usize), + _ => Err(ArrowError::InvalidArgumentError(format!( + "Invalid integer size: {}", + size + ))), + } + } + + /// Calculate the size needed to store an integer + pub fn calculate_int_size(value: usize) -> usize { + if value <= u8::MAX as usize { + 1 + } else if value <= u16::MAX as usize { + 2 + } else if value <= 0xFFFFFF { + 3 + } else { + 4 + } + } + + /// Build object header byte + pub fn build_object_header( + is_large: bool, + field_id_size: usize, + field_offset_size: usize, + ) -> u8 { + let large_bit = if is_large { 1 } else { 0 }; + (large_bit << 6) + | (((field_id_size - 1) as u8) << 4) + | (((field_offset_size - 1) as u8) << 2) + | 2 + } + + /// Write integer bytes to buffer + pub fn write_int_bytes(buffer: &mut Vec<u8>, value: usize, size: usize) { + match size { + 1 => buffer.push(value as u8), + 2 => buffer.extend_from_slice(&(value as u16).to_le_bytes()), + 3 => { + let bytes = (value as u32).to_le_bytes(); + buffer.extend_from_slice(&bytes[..3]); + } + 4 => buffer.extend_from_slice(&(value as u32).to_le_bytes()), + _ => panic!("Invalid size: {}", size), + } + } + + + + /// Check if value bytes represent a primitive + pub fn is_primitive(value_bytes: &[u8]) -> bool { + if value_bytes.is_empty() { + return false; + } + Self::get_basic_type(value_bytes[0]) == VariantBasicType::Primitive + } + + /// Check if value bytes represent a short string + pub fn is_short_string(value_bytes: &[u8]) -> bool { + if value_bytes.is_empty() { + return false; + } + Self::get_basic_type(value_bytes[0]) == VariantBasicType::ShortString + } + + /// Check if value bytes represent an object + pub fn is_object(value_bytes: &[u8]) -> bool { + if value_bytes.is_empty() { + return false; + } + Self::get_basic_type(value_bytes[0]) == VariantBasicType::Object + } + + /// Check if value bytes represent an array + pub fn is_array(value_bytes: &[u8]) -> bool { + if value_bytes.is_empty() { + return false; + } + Self::get_basic_type(value_bytes[0]) == VariantBasicType::Array + } + + /// Get the data length for a primitive type + /// Returns Some(len) for fixed-length types, None for variable-length types + pub fn get_primitive_data_length(primitive_type: &PrimitiveType) -> Option<usize> { + match primitive_type { + PrimitiveType::Null | PrimitiveType::True | PrimitiveType::False => Some(0), + PrimitiveType::Int8 => Some(1), + PrimitiveType::Int16 => Some(2), + PrimitiveType::Int32 + | PrimitiveType::Float + | PrimitiveType::Decimal4 + | PrimitiveType::Date => Some(4), + PrimitiveType::Int64 + | PrimitiveType::Double + | PrimitiveType::Decimal8 + | PrimitiveType::TimestampNtz + | PrimitiveType::TimestampLtz => Some(8), + PrimitiveType::Decimal16 => Some(16), + PrimitiveType::Binary | PrimitiveType::String => None, // Variable length, need to read from data + } + } + + /// Extract short string data from value bytes + pub fn extract_short_string_data(value_bytes: &[u8]) -> Result<&[u8], ArrowError> { + if value_bytes.is_empty() { + return Err(ArrowError::InvalidArgumentError( + "Empty value bytes".to_string(), + )); + } + + let header = Self::parse_short_string_header(value_bytes[0])?; + + if value_bytes.len() < 1 + header.length { + return Err(ArrowError::InvalidArgumentError(format!( + "Short string data length {} exceeds available bytes", + header.length + ))); + } + + Ok(&value_bytes[1..1 + header.length]) Review Comment: ```suggestion let Some(header) = value_bytes.get(0) else { return Err(ArrowError::InvalidArgumentError( "Empty value bytes".to_string(), )); }; let header = Self::parse_short_string_header(header)?; slice_from_slice_at_offset(1, 0..header.length) ``` (simpler, and avoids integer overflow panic risk on 32-bit arch) ########## parquet-variant-compute/src/variant_array.rs: ########## @@ -158,13 +159,93 @@ impl VariantArray { /// Return a reference to the metadata field of the [`StructArray`] pub fn metadata_field(&self) -> &ArrayRef { // spec says fields order is not guaranteed, so we search by name - &self.metadata_ref + self.inner.column_by_name("metadata").unwrap() } /// Return a reference to the value field of the `StructArray` pub fn value_field(&self) -> &ArrayRef { // spec says fields order is not guaranteed, so we search by name - &self.value_ref + self.inner.column_by_name("value").unwrap() + } + + /// Get the metadata bytes for a specific index + pub fn metadata_bytes(&self, index: usize) -> &[u8] { + self.metadata_field().as_binary_view().value(index).as_ref() + } + + /// Get the value bytes for a specific index + pub fn value_bytes(&self, index: usize) -> &[u8] { + self.value_field().as_binary_view().value(index).as_ref() + } + + /// Get the field names for an object at the given index + pub fn get_field_names(&self, index: usize) -> Vec<String> { + if index >= self.len() { + return vec![]; + } + + if self.is_null(index) { Review Comment: Actually, doesn't the `value` call below already handle the is-null case by returning None? ########## parquet-variant-compute/src/variant_parser.rs: ########## @@ -0,0 +1,669 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Low-level binary format parsing for variant objects + +use arrow::error::ArrowError; + +/// Basic variant type enumeration for the first 2 bits of header +#[derive(Debug, Clone, PartialEq)] +pub enum VariantBasicType { + Primitive = 0, + ShortString = 1, + Object = 2, + Array = 3, +} + +/// Primitive type variants +#[derive(Debug, Clone, PartialEq)] +pub enum PrimitiveType { + Null, + True, + False, + Int8, + Int16, + Int32, + Int64, + Double, + Decimal4, + Decimal8, + Decimal16, + Date, + TimestampNtz, + TimestampLtz, + Float, + Binary, + String, +} + +/// Variant type enumeration covering all possible types +#[derive(Debug, Clone, PartialEq)] +pub enum VariantType { + Primitive(PrimitiveType), + ShortString(ShortStringHeader), + Object(ObjectHeader), + Array(ArrayHeader), +} + +/// Short string header structure +#[derive(Debug, Clone, PartialEq)] +pub struct ShortStringHeader { + pub length: usize, +} + +/// Object header structure for variant objects +#[derive(Debug, Clone, PartialEq)] +pub struct ObjectHeader { + pub num_elements_size: usize, + pub field_id_size: usize, + pub field_offset_size: usize, + pub is_large: bool, +} + +/// Array header structure for variant objects +#[derive(Debug, Clone, PartialEq)] +pub struct ArrayHeader { + pub num_elements_size: usize, + pub element_offset_size: usize, + pub is_large: bool, +} + +/// Object byte offsets structure +#[derive(Debug, Clone)] +pub struct ObjectOffsets { + pub field_ids_start: usize, + pub field_offsets_start: usize, + pub values_start: usize, +} + +/// Array byte offsets structure +#[derive(Debug, Clone)] +pub struct ArrayOffsets { + pub element_offsets_start: usize, + pub elements_start: usize, +} + +/// Low-level parser for variant binary format +pub struct VariantParser; + +impl VariantParser { + /// General dispatch function to parse any variant header + pub fn parse_variant_header(header_byte: u8) -> Result<VariantType, ArrowError> { + let basic_type = Self::get_basic_type(header_byte); + + match basic_type { + VariantBasicType::Primitive => Ok(VariantType::Primitive( + Self::parse_primitive_header(header_byte)?, + )), + VariantBasicType::ShortString => Ok(VariantType::ShortString( + Self::parse_short_string_header(header_byte)?, + )), + VariantBasicType::Object => { + Ok(VariantType::Object(Self::parse_object_header(header_byte)?)) + } + VariantBasicType::Array => { + Ok(VariantType::Array(Self::parse_array_header(header_byte)?)) + } + } + } + + /// Parse primitive type header + pub fn parse_primitive_header(header_byte: u8) -> Result<PrimitiveType, ArrowError> { + let primitive_type = header_byte >> 2; + + match primitive_type { + 0 => Ok(PrimitiveType::Null), + 1 => Ok(PrimitiveType::True), + 2 => Ok(PrimitiveType::False), + 3 => Ok(PrimitiveType::Int8), + 4 => Ok(PrimitiveType::Int16), + 5 => Ok(PrimitiveType::Int32), + 6 => Ok(PrimitiveType::Int64), + 7 => Ok(PrimitiveType::Double), + 8 => Ok(PrimitiveType::Decimal4), + 9 => Ok(PrimitiveType::Decimal8), + 10 => Ok(PrimitiveType::Decimal16), + 11 => Ok(PrimitiveType::Date), + 12 => Ok(PrimitiveType::TimestampNtz), + 13 => Ok(PrimitiveType::TimestampLtz), + 14 => Ok(PrimitiveType::Float), + 15 => Ok(PrimitiveType::Binary), + 16 => Ok(PrimitiveType::String), + _ => Err(ArrowError::InvalidArgumentError(format!( + "Invalid primitive type: {}", + primitive_type + ))), + } + } + + /// Get the basic type from header byte + pub fn get_basic_type(header_byte: u8) -> VariantBasicType { + match header_byte & 0x03 { + 0 => VariantBasicType::Primitive, + 1 => VariantBasicType::ShortString, + 2 => VariantBasicType::Object, + 3 => VariantBasicType::Array, + _ => panic!("Invalid basic type: {}", header_byte & 0x03), + } + } + + + + /// Parse short string header + pub fn parse_short_string_header(header_byte: u8) -> Result<ShortStringHeader, ArrowError> { + let length = (header_byte >> 2) as usize; + + // Short strings can be up to 64 bytes (6-bit value: 0-63) + if length > 63 { + return Err(ArrowError::InvalidArgumentError(format!( + "Short string length {} exceeds maximum of 63", + length + ))); + } + + Ok(ShortStringHeader { length }) + } + + /// Parse object header from header byte + pub fn parse_object_header(header_byte: u8) -> Result<ObjectHeader, ArrowError> { + let value_header = header_byte >> 2; + let field_offset_size_minus_one = value_header & 0x03; + let field_id_size_minus_one = (value_header >> 2) & 0x03; + let is_large = (value_header & 0x10) != 0; + + let num_elements_size = if is_large { 4 } else { 1 }; + let field_id_size = (field_id_size_minus_one + 1) as usize; + let field_offset_size = (field_offset_size_minus_one + 1) as usize; + + Ok(ObjectHeader { + num_elements_size, + field_id_size, + field_offset_size, + is_large, + }) + } + + /// Parse array header from header byte + pub fn parse_array_header(header_byte: u8) -> Result<ArrayHeader, ArrowError> { + let value_header = header_byte >> 2; + let element_offset_size_minus_one = value_header & 0x03; + let is_large = (value_header & 0x10) != 0; + + let num_elements_size = if is_large { 4 } else { 1 }; + let element_offset_size = (element_offset_size_minus_one + 1) as usize; + + Ok(ArrayHeader { + num_elements_size, + element_offset_size, + is_large, + }) + } + + /// Unpack integer from bytes + pub fn unpack_int(bytes: &[u8], size: usize) -> Result<usize, ArrowError> { + if bytes.len() < size { + return Err(ArrowError::InvalidArgumentError( + "Not enough bytes to unpack integer".to_string(), + )); + } + + match size { + 1 => Ok(bytes[0] as usize), + 2 => Ok(u16::from_le_bytes([bytes[0], bytes[1]]) as usize), + 3 => Ok(u32::from_le_bytes([bytes[0], bytes[1], bytes[2], 0]) as usize), + 4 => Ok(u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as usize), + _ => Err(ArrowError::InvalidArgumentError(format!( + "Invalid integer size: {}", + size + ))), + } + } + + /// Calculate the size needed to store an integer + pub fn calculate_int_size(value: usize) -> usize { + if value <= u8::MAX as usize { + 1 + } else if value <= u16::MAX as usize { + 2 + } else if value <= 0xFFFFFF { + 3 + } else { + 4 + } + } + + /// Build object header byte + pub fn build_object_header( + is_large: bool, + field_id_size: usize, + field_offset_size: usize, + ) -> u8 { + let large_bit = if is_large { 1 } else { 0 }; + (large_bit << 6) + | (((field_id_size - 1) as u8) << 4) + | (((field_offset_size - 1) as u8) << 2) + | 2 + } + + /// Write integer bytes to buffer + pub fn write_int_bytes(buffer: &mut Vec<u8>, value: usize, size: usize) { + match size { + 1 => buffer.push(value as u8), + 2 => buffer.extend_from_slice(&(value as u16).to_le_bytes()), + 3 => { + let bytes = (value as u32).to_le_bytes(); + buffer.extend_from_slice(&bytes[..3]); + } + 4 => buffer.extend_from_slice(&(value as u32).to_le_bytes()), + _ => panic!("Invalid size: {}", size), + } + } + + + + /// Check if value bytes represent a primitive + pub fn is_primitive(value_bytes: &[u8]) -> bool { + if value_bytes.is_empty() { + return false; + } + Self::get_basic_type(value_bytes[0]) == VariantBasicType::Primitive + } + + /// Check if value bytes represent a short string + pub fn is_short_string(value_bytes: &[u8]) -> bool { + if value_bytes.is_empty() { + return false; + } + Self::get_basic_type(value_bytes[0]) == VariantBasicType::ShortString + } + + /// Check if value bytes represent an object + pub fn is_object(value_bytes: &[u8]) -> bool { + if value_bytes.is_empty() { + return false; + } + Self::get_basic_type(value_bytes[0]) == VariantBasicType::Object + } + + /// Check if value bytes represent an array + pub fn is_array(value_bytes: &[u8]) -> bool { + if value_bytes.is_empty() { + return false; + } + Self::get_basic_type(value_bytes[0]) == VariantBasicType::Array + } + + /// Get the data length for a primitive type + /// Returns Some(len) for fixed-length types, None for variable-length types + pub fn get_primitive_data_length(primitive_type: &PrimitiveType) -> Option<usize> { + match primitive_type { + PrimitiveType::Null | PrimitiveType::True | PrimitiveType::False => Some(0), + PrimitiveType::Int8 => Some(1), + PrimitiveType::Int16 => Some(2), + PrimitiveType::Int32 + | PrimitiveType::Float + | PrimitiveType::Decimal4 + | PrimitiveType::Date => Some(4), + PrimitiveType::Int64 + | PrimitiveType::Double + | PrimitiveType::Decimal8 + | PrimitiveType::TimestampNtz + | PrimitiveType::TimestampLtz => Some(8), + PrimitiveType::Decimal16 => Some(16), + PrimitiveType::Binary | PrimitiveType::String => None, // Variable length, need to read from data + } + } + + /// Extract short string data from value bytes + pub fn extract_short_string_data(value_bytes: &[u8]) -> Result<&[u8], ArrowError> { + if value_bytes.is_empty() { + return Err(ArrowError::InvalidArgumentError( + "Empty value bytes".to_string(), + )); + } + + let header = Self::parse_short_string_header(value_bytes[0])?; + + if value_bytes.len() < 1 + header.length { + return Err(ArrowError::InvalidArgumentError(format!( + "Short string data length {} exceeds available bytes", + header.length + ))); + } + + Ok(&value_bytes[1..1 + header.length]) + } + + /// Extract primitive data from value bytes + pub fn extract_primitive_data(value_bytes: &[u8]) -> Result<&[u8], ArrowError> { + if value_bytes.is_empty() { + return Err(ArrowError::InvalidArgumentError( + "Empty value bytes".to_string(), + )); + } + + let primitive_type = Self::parse_primitive_header(value_bytes[0])?; + let data_length = Self::get_primitive_data_length(&primitive_type); + + match data_length { + Some(0) => { + // Fixed-length 0-byte types (null/true/false) + Ok(&[]) + } + Some(len) => { + // Fixed-length types with len bytes + if value_bytes.len() < 1 + len { + return Err(ArrowError::InvalidArgumentError(format!( + "Fixed length primitive data length {} exceeds available bytes", + len + ))); + } + Ok(&value_bytes[1..1 + len]) + } + None => { + // Variable-length types (binary/string) - read length from data + if value_bytes.len() < 5 { + return Err(ArrowError::InvalidArgumentError( + "Not enough bytes for variable length primitive".to_string(), + )); + } + let length = u32::from_le_bytes([ + value_bytes[1], + value_bytes[2], + value_bytes[3], + value_bytes[4], + ]) as usize; + if value_bytes.len() < 5 + length { + return Err(ArrowError::InvalidArgumentError( + "Variable length primitive data exceeds available bytes".to_string(), + )); + } + Ok(&value_bytes[5..5 + length]) + } + } + } + + /// Calculate byte offsets for array elements + pub fn calculate_array_offsets(header: &ArrayHeader, num_elements: usize) -> ArrayOffsets { + let element_offsets_start = 1 + header.num_elements_size; + let elements_start = + element_offsets_start + ((num_elements + 1) * header.element_offset_size); Review Comment: integer overflow risk on 32-bit arch... see decoder.rs (again below) ########## parquet-variant-compute/src/field_operations.rs: ########## @@ -0,0 +1,492 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Field extraction and removal operations for variant objects + +use crate::variant_parser::{ObjectHeader, ObjectOffsets, VariantParser}; +use arrow::error::ArrowError; +use parquet_variant::{VariantMetadata, VariantPath, VariantPathElement}; +use std::collections::HashSet; + +/// Field operations for variant objects +pub struct FieldOperations; + +impl FieldOperations { + /// Extract field bytes from a single variant object + pub fn extract_field_bytes( + metadata_bytes: &[u8], + value_bytes: &[u8], + field_name: &str, + ) -> Result<Option<Vec<u8>>, ArrowError> { + if !VariantParser::is_object(value_bytes) { + return Ok(None); + } + + let header_byte = value_bytes[0]; + let header = VariantParser::parse_object_header(header_byte)?; + let num_elements = VariantParser::unpack_int(&value_bytes[1..], header.num_elements_size)?; + let offsets = VariantParser::calculate_object_offsets(&header, num_elements); + + // Find field ID for the target field name + let target_field_id = Self::find_field_id(metadata_bytes, field_name)?; + let target_field_id = match target_field_id { + Some(id) => id, + None => return Ok(None), // Field not found + }; + + // Search for the field in the object + for i in 0..num_elements { + let field_id_offset = offsets.field_ids_start + (i * header.field_id_size); + let field_id = + VariantParser::unpack_int(&value_bytes[field_id_offset..], header.field_id_size)?; + + if field_id == target_field_id { + return Self::extract_field_value_at_index( + value_bytes, + &header, + &offsets, + i, + num_elements, + ); + } + } + + Ok(None) + } + + /// Remove field from a single variant object + pub fn remove_field_bytes( + metadata_bytes: &[u8], + value_bytes: &[u8], + field_name: &str, + ) -> Result<Option<Vec<u8>>, ArrowError> { + Self::remove_fields_bytes(metadata_bytes, value_bytes, &[field_name]) + } + + /// Remove multiple fields from a single variant object + pub fn remove_fields_bytes( + metadata_bytes: &[u8], + value_bytes: &[u8], + field_names: &[&str], + ) -> Result<Option<Vec<u8>>, ArrowError> { + if !VariantParser::is_object(value_bytes) { + return Ok(Some(value_bytes.to_vec())); + } + + let header_byte = value_bytes[0]; + let header = VariantParser::parse_object_header(header_byte)?; + let num_elements = VariantParser::unpack_int(&value_bytes[1..], header.num_elements_size)?; + let offsets = VariantParser::calculate_object_offsets(&header, num_elements); + + // Find field IDs for target field names + let target_field_ids = Self::find_field_ids(metadata_bytes, field_names)?; + + if target_field_ids.is_empty() { + return Ok(Some(value_bytes.to_vec())); // No fields to remove + } + + // Collect fields to keep + let fields_to_keep = Self::collect_fields_to_keep( + value_bytes, + &header, + &offsets, + num_elements, + &target_field_ids, + )?; + + if fields_to_keep.len() == num_elements { + return Ok(Some(value_bytes.to_vec())); // No fields were removed + } + + // Sort fields by name for proper variant object ordering + let sorted_fields = Self::sort_fields_by_name(metadata_bytes, fields_to_keep)?; + + // Reconstruct object with remaining fields + Self::reconstruct_object(sorted_fields) + } + + /// Find field ID for a given field name + fn find_field_id(metadata_bytes: &[u8], field_name: &str) -> Result<Option<usize>, ArrowError> { + let metadata = VariantMetadata::try_new(metadata_bytes)?; + + for dict_idx in 0..metadata.len() { + if let Ok(name) = metadata.get(dict_idx) { + if name == field_name { + return Ok(Some(dict_idx)); + } + } + } + + Ok(None) + } + + /// Find field IDs for multiple field names + fn find_field_ids( + metadata_bytes: &[u8], + field_names: &[&str], + ) -> Result<HashSet<usize>, ArrowError> { + let metadata = VariantMetadata::try_new(metadata_bytes)?; + let mut target_field_ids = HashSet::new(); + + for field_name in field_names { + for dict_idx in 0..metadata.len() { + if let Ok(name) = metadata.get(dict_idx) { Review Comment: This will be horribly expensive for a big metadata dictionary... we should at least track a TODO to use binary search for non-tiny dictionaries. See similar code in parquet-variant crate. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org