parthchandra commented on code in PR #3221: URL: https://github.com/apache/datafusion-comet/pull/3221#discussion_r2710477995
########## native/core/src/execution/columnar_to_row.rs: ########## @@ -0,0 +1,2752 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Native implementation of columnar to row conversion for Spark UnsafeRow format. +//! +//! This module converts Arrow columnar data to Spark's UnsafeRow format, which is used +//! for row-based operations in Spark. The conversion is done in native code for better +//! performance compared to the JVM implementation. +//! +//! # UnsafeRow Format +//! +//! ```text +//! ┌─────────────────────────────────────────────────────────────┐ +//! │ Null Bitset: ((numFields + 63) / 64) * 8 bytes │ +//! ├─────────────────────────────────────────────────────────────┤ +//! │ Fixed-width portion: 8 bytes per field │ +//! │ - Primitives: value stored directly (in lowest bytes) │ +//! │ - Variable-length: (offset << 32) | length │ +//! ├─────────────────────────────────────────────────────────────┤ +//! │ Variable-length data: 8-byte aligned │ +//! └─────────────────────────────────────────────────────────────┘ +//! ``` + +use crate::errors::{CometError, CometResult}; +use arrow::array::types::{ + ArrowDictionaryKeyType, Int16Type, Int32Type, Int64Type, Int8Type, UInt16Type, UInt32Type, + UInt64Type, UInt8Type, +}; +use arrow::array::*; +use arrow::datatypes::{ArrowNativeType, DataType, TimeUnit}; +use std::sync::Arc; + +/// Maximum digits for decimal that can fit in a long (8 bytes). +const MAX_LONG_DIGITS: u8 = 18; + +/// Pre-downcast array reference to avoid type dispatch in inner loops. +/// This enum holds references to concrete array types, allowing direct access +/// without repeated downcast_ref calls. +enum TypedArray<'a> { + Boolean(&'a BooleanArray), + Int8(&'a Int8Array), + Int16(&'a Int16Array), + Int32(&'a Int32Array), + Int64(&'a Int64Array), + Float32(&'a Float32Array), + Float64(&'a Float64Array), + Date32(&'a Date32Array), + TimestampMicro(&'a TimestampMicrosecondArray), + Decimal128(&'a Decimal128Array, u8), // array + precision + String(&'a StringArray), + LargeString(&'a LargeStringArray), + Binary(&'a BinaryArray), + LargeBinary(&'a LargeBinaryArray), + Struct( + &'a StructArray, + arrow::datatypes::Fields, + Vec<TypedElements<'a>>, + ), + List(&'a ListArray, arrow::datatypes::FieldRef), + LargeList(&'a LargeListArray, arrow::datatypes::FieldRef), + Map(&'a MapArray, arrow::datatypes::FieldRef), + Dictionary(&'a ArrayRef, DataType), // fallback for dictionary types +} + +impl<'a> TypedArray<'a> { + /// Pre-downcast an ArrayRef to a TypedArray. + fn from_array(array: &'a ArrayRef, schema_type: &DataType) -> CometResult<Self> { + let actual_type = array.data_type(); + match actual_type { + DataType::Boolean => Ok(TypedArray::Boolean( + array + .as_any() + .downcast_ref::<BooleanArray>() + .ok_or_else(|| { + CometError::Internal("Failed to downcast to BooleanArray".to_string()) + })?, + )), + DataType::Int8 => Ok(TypedArray::Int8( + array.as_any().downcast_ref::<Int8Array>().ok_or_else(|| { + CometError::Internal("Failed to downcast to Int8Array".to_string()) + })?, + )), + DataType::Int16 => Ok(TypedArray::Int16( + array.as_any().downcast_ref::<Int16Array>().ok_or_else(|| { + CometError::Internal("Failed to downcast to Int16Array".to_string()) + })?, + )), + DataType::Int32 => Ok(TypedArray::Int32( + array.as_any().downcast_ref::<Int32Array>().ok_or_else(|| { + CometError::Internal("Failed to downcast to Int32Array".to_string()) + })?, + )), + DataType::Int64 => Ok(TypedArray::Int64( + array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| { + CometError::Internal("Failed to downcast to Int64Array".to_string()) + })?, + )), + DataType::Float32 => Ok(TypedArray::Float32( + array + .as_any() + .downcast_ref::<Float32Array>() + .ok_or_else(|| { + CometError::Internal("Failed to downcast to Float32Array".to_string()) + })?, + )), + DataType::Float64 => Ok(TypedArray::Float64( + array + .as_any() + .downcast_ref::<Float64Array>() + .ok_or_else(|| { + CometError::Internal("Failed to downcast to Float64Array".to_string()) + })?, + )), + DataType::Date32 => Ok(TypedArray::Date32( + array + .as_any() + .downcast_ref::<Date32Array>() + .ok_or_else(|| { + CometError::Internal("Failed to downcast to Date32Array".to_string()) + })?, + )), + DataType::Timestamp(TimeUnit::Microsecond, _) => Ok(TypedArray::TimestampMicro( + array + .as_any() + .downcast_ref::<TimestampMicrosecondArray>() + .ok_or_else(|| { + CometError::Internal( + "Failed to downcast to TimestampMicrosecondArray".to_string(), + ) + })?, + )), + DataType::Decimal128(p, _) => Ok(TypedArray::Decimal128( + array + .as_any() + .downcast_ref::<Decimal128Array>() + .ok_or_else(|| { + CometError::Internal("Failed to downcast to Decimal128Array".to_string()) + })?, + *p, + )), + DataType::Utf8 => Ok(TypedArray::String( + array + .as_any() + .downcast_ref::<StringArray>() + .ok_or_else(|| { + CometError::Internal("Failed to downcast to StringArray".to_string()) + })?, + )), + DataType::LargeUtf8 => Ok(TypedArray::LargeString( + array + .as_any() + .downcast_ref::<LargeStringArray>() + .ok_or_else(|| { + CometError::Internal("Failed to downcast to LargeStringArray".to_string()) + })?, + )), + DataType::Binary => Ok(TypedArray::Binary( + array + .as_any() + .downcast_ref::<BinaryArray>() + .ok_or_else(|| { + CometError::Internal("Failed to downcast to BinaryArray".to_string()) + })?, + )), + DataType::LargeBinary => Ok(TypedArray::LargeBinary( + array + .as_any() + .downcast_ref::<LargeBinaryArray>() + .ok_or_else(|| { + CometError::Internal("Failed to downcast to LargeBinaryArray".to_string()) + })?, + )), + DataType::Struct(fields) => { + let struct_arr = array + .as_any() + .downcast_ref::<StructArray>() + .ok_or_else(|| { + CometError::Internal("Failed to downcast to StructArray".to_string()) + })?; + // Pre-downcast all struct fields once + let typed_fields: Vec<TypedElements> = fields + .iter() + .enumerate() + .map(|(idx, field)| { + TypedElements::from_array(struct_arr.column(idx), field.data_type()) + }) + .collect(); + Ok(TypedArray::Struct(struct_arr, fields.clone(), typed_fields)) + } + DataType::List(field) => Ok(TypedArray::List( + array.as_any().downcast_ref::<ListArray>().ok_or_else(|| { + CometError::Internal("Failed to downcast to ListArray".to_string()) + })?, + Arc::clone(field), + )), + DataType::LargeList(field) => Ok(TypedArray::LargeList( + array + .as_any() + .downcast_ref::<LargeListArray>() + .ok_or_else(|| { + CometError::Internal("Failed to downcast to LargeListArray".to_string()) + })?, + Arc::clone(field), + )), + DataType::Map(field, _) => Ok(TypedArray::Map( + array.as_any().downcast_ref::<MapArray>().ok_or_else(|| { + CometError::Internal("Failed to downcast to MapArray".to_string()) + })?, + Arc::clone(field), + )), + DataType::Dictionary(_, _) => Ok(TypedArray::Dictionary(array, schema_type.clone())), + _ => Err(CometError::Internal(format!( + "Unsupported data type for pre-downcast: {:?}", + actual_type + ))), + } + } + + /// Check if the value at the given index is null. + #[inline] + fn is_null(&self, row_idx: usize) -> bool { + match self { + TypedArray::Boolean(arr) => arr.is_null(row_idx), + TypedArray::Int8(arr) => arr.is_null(row_idx), + TypedArray::Int16(arr) => arr.is_null(row_idx), + TypedArray::Int32(arr) => arr.is_null(row_idx), + TypedArray::Int64(arr) => arr.is_null(row_idx), + TypedArray::Float32(arr) => arr.is_null(row_idx), + TypedArray::Float64(arr) => arr.is_null(row_idx), + TypedArray::Date32(arr) => arr.is_null(row_idx), + TypedArray::TimestampMicro(arr) => arr.is_null(row_idx), + TypedArray::Decimal128(arr, _) => arr.is_null(row_idx), + TypedArray::String(arr) => arr.is_null(row_idx), + TypedArray::LargeString(arr) => arr.is_null(row_idx), + TypedArray::Binary(arr) => arr.is_null(row_idx), + TypedArray::LargeBinary(arr) => arr.is_null(row_idx), + TypedArray::Struct(arr, _, _) => arr.is_null(row_idx), + TypedArray::List(arr, _) => arr.is_null(row_idx), + TypedArray::LargeList(arr, _) => arr.is_null(row_idx), + TypedArray::Map(arr, _) => arr.is_null(row_idx), + TypedArray::Dictionary(arr, _) => arr.is_null(row_idx), + } + } + + /// Get the fixed-width value as i64 (for types that fit in 8 bytes). + #[inline] + fn get_fixed_value(&self, row_idx: usize) -> i64 { + match self { + TypedArray::Boolean(arr) => { + if arr.value(row_idx) { + 1i64 + } else { + 0i64 + } + } + TypedArray::Int8(arr) => arr.value(row_idx) as i64, + TypedArray::Int16(arr) => arr.value(row_idx) as i64, + TypedArray::Int32(arr) => arr.value(row_idx) as i64, + TypedArray::Int64(arr) => arr.value(row_idx), + TypedArray::Float32(arr) => arr.value(row_idx).to_bits() as i64, + TypedArray::Float64(arr) => arr.value(row_idx).to_bits() as i64, + TypedArray::Date32(arr) => arr.value(row_idx) as i64, + TypedArray::TimestampMicro(arr) => arr.value(row_idx), + TypedArray::Decimal128(arr, precision) => { + if *precision <= MAX_LONG_DIGITS { + arr.value(row_idx) as i64 + } else { + 0 // Variable-length decimal, handled elsewhere + } + } + // Variable-length types return 0, actual value written separately + _ => 0, + } + } + + /// Check if this is a variable-length type. + #[inline] + fn is_variable_length(&self) -> bool { + match self { + TypedArray::Boolean(_) + | TypedArray::Int8(_) + | TypedArray::Int16(_) + | TypedArray::Int32(_) + | TypedArray::Int64(_) + | TypedArray::Float32(_) + | TypedArray::Float64(_) + | TypedArray::Date32(_) + | TypedArray::TimestampMicro(_) => false, + TypedArray::Decimal128(_, precision) => *precision > MAX_LONG_DIGITS, + _ => true, + } + } + + /// Write variable-length data to buffer. Returns actual length (0 if not variable-length). + fn write_variable_to_buffer(&self, buffer: &mut Vec<u8>, row_idx: usize) -> CometResult<usize> { + match self { + TypedArray::String(arr) => { + let bytes = arr.value(row_idx).as_bytes(); + let len = bytes.len(); + buffer.extend_from_slice(bytes); + let padding = round_up_to_8(len) - len; + buffer.extend(std::iter::repeat_n(0u8, padding)); + Ok(len) + } + TypedArray::LargeString(arr) => { + let bytes = arr.value(row_idx).as_bytes(); + let len = bytes.len(); + buffer.extend_from_slice(bytes); + let padding = round_up_to_8(len) - len; + buffer.extend(std::iter::repeat_n(0u8, padding)); + Ok(len) + } + TypedArray::Binary(arr) => { + let bytes = arr.value(row_idx); + let len = bytes.len(); + buffer.extend_from_slice(bytes); + let padding = round_up_to_8(len) - len; + buffer.extend(std::iter::repeat_n(0u8, padding)); + Ok(len) + } + TypedArray::LargeBinary(arr) => { + let bytes = arr.value(row_idx); + let len = bytes.len(); + buffer.extend_from_slice(bytes); + let padding = round_up_to_8(len) - len; + buffer.extend(std::iter::repeat_n(0u8, padding)); + Ok(len) + } + TypedArray::Decimal128(arr, precision) if *precision > MAX_LONG_DIGITS => { + let bytes = i128_to_spark_decimal_bytes(arr.value(row_idx)); + let len = bytes.len(); + buffer.extend_from_slice(&bytes); + let padding = round_up_to_8(len) - len; + buffer.extend(std::iter::repeat_n(0u8, padding)); + Ok(len) + } + TypedArray::Struct(arr, fields, typed_fields) => { + write_struct_to_buffer_typed(buffer, arr, row_idx, fields, typed_fields) + } + TypedArray::List(arr, field) => write_list_to_buffer(buffer, arr, row_idx, field), + TypedArray::LargeList(arr, field) => { + write_large_list_to_buffer(buffer, arr, row_idx, field) + } + TypedArray::Map(arr, field) => write_map_to_buffer(buffer, arr, row_idx, field), + TypedArray::Dictionary(arr, schema_type) => { + if let DataType::Dictionary(key_type, value_type) = schema_type { + write_dictionary_to_buffer( + buffer, + arr, + row_idx, + key_type.as_ref(), + value_type.as_ref(), + ) + } else { + Err(CometError::Internal(format!( + "Expected Dictionary type but got {:?}", + schema_type + ))) + } + } + _ => Ok(0), // Fixed-width types + } + } +} + +/// Pre-downcast element array for list/array types. +/// This allows direct access to element values without per-row allocation. +enum TypedElements<'a> { + Boolean(&'a BooleanArray), + Int8(&'a Int8Array), + Int16(&'a Int16Array), + Int32(&'a Int32Array), + Int64(&'a Int64Array), + Float32(&'a Float32Array), + Float64(&'a Float64Array), + Date32(&'a Date32Array), + TimestampMicro(&'a TimestampMicrosecondArray), + Decimal128(&'a Decimal128Array, u8), + String(&'a StringArray), + LargeString(&'a LargeStringArray), + Binary(&'a BinaryArray), + LargeBinary(&'a LargeBinaryArray), + // For nested types, fall back to ArrayRef + Other(&'a ArrayRef, DataType), +} + +impl<'a> TypedElements<'a> { + /// Create from an ArrayRef and element type. + fn from_array(array: &'a ArrayRef, element_type: &DataType) -> Self { + match element_type { + DataType::Boolean => { + if let Some(arr) = array.as_any().downcast_ref::<BooleanArray>() { + return TypedElements::Boolean(arr); + } + } + DataType::Int8 => { + if let Some(arr) = array.as_any().downcast_ref::<Int8Array>() { + return TypedElements::Int8(arr); + } + } + DataType::Int16 => { + if let Some(arr) = array.as_any().downcast_ref::<Int16Array>() { + return TypedElements::Int16(arr); + } + } + DataType::Int32 => { + if let Some(arr) = array.as_any().downcast_ref::<Int32Array>() { + return TypedElements::Int32(arr); + } + } + DataType::Int64 => { + if let Some(arr) = array.as_any().downcast_ref::<Int64Array>() { + return TypedElements::Int64(arr); + } + } + DataType::Float32 => { + if let Some(arr) = array.as_any().downcast_ref::<Float32Array>() { + return TypedElements::Float32(arr); + } + } + DataType::Float64 => { + if let Some(arr) = array.as_any().downcast_ref::<Float64Array>() { + return TypedElements::Float64(arr); + } + } + DataType::Date32 => { + if let Some(arr) = array.as_any().downcast_ref::<Date32Array>() { + return TypedElements::Date32(arr); + } + } + DataType::Timestamp(TimeUnit::Microsecond, _) => { + if let Some(arr) = array.as_any().downcast_ref::<TimestampMicrosecondArray>() { + return TypedElements::TimestampMicro(arr); + } + } + DataType::Decimal128(p, _) => { + if let Some(arr) = array.as_any().downcast_ref::<Decimal128Array>() { + return TypedElements::Decimal128(arr, *p); + } + } + DataType::Utf8 => { + if let Some(arr) = array.as_any().downcast_ref::<StringArray>() { + return TypedElements::String(arr); + } + } + DataType::LargeUtf8 => { + if let Some(arr) = array.as_any().downcast_ref::<LargeStringArray>() { + return TypedElements::LargeString(arr); + } + } + DataType::Binary => { + if let Some(arr) = array.as_any().downcast_ref::<BinaryArray>() { + return TypedElements::Binary(arr); + } + } + DataType::LargeBinary => { + if let Some(arr) = array.as_any().downcast_ref::<LargeBinaryArray>() { + return TypedElements::LargeBinary(arr); + } + } + _ => {} + } + TypedElements::Other(array, element_type.clone()) + } + + /// Get element size for UnsafeArrayData format. + fn element_size(&self) -> usize { + match self { + TypedElements::Boolean(_) => 1, + TypedElements::Int8(_) => 1, + TypedElements::Int16(_) => 2, + TypedElements::Int32(_) | TypedElements::Date32(_) | TypedElements::Float32(_) => 4, + TypedElements::Int64(_) + | TypedElements::TimestampMicro(_) + | TypedElements::Float64(_) => 8, + TypedElements::Decimal128(_, p) if *p <= MAX_LONG_DIGITS => 8, + _ => 8, // Variable-length uses 8 bytes for offset+length + } + } + + /// Check if this is a fixed-width primitive type that supports bulk copy. + fn supports_bulk_copy(&self) -> bool { + matches!( + self, + TypedElements::Int8(_) + | TypedElements::Int16(_) + | TypedElements::Int32(_) + | TypedElements::Int64(_) + | TypedElements::Float32(_) + | TypedElements::Float64(_) + | TypedElements::Date32(_) + | TypedElements::TimestampMicro(_) + ) + } + + /// Check if value at given index is null. + #[inline] + fn is_null_at(&self, idx: usize) -> bool { + match self { + TypedElements::Boolean(arr) => arr.is_null(idx), + TypedElements::Int8(arr) => arr.is_null(idx), + TypedElements::Int16(arr) => arr.is_null(idx), + TypedElements::Int32(arr) => arr.is_null(idx), + TypedElements::Int64(arr) => arr.is_null(idx), + TypedElements::Float32(arr) => arr.is_null(idx), + TypedElements::Float64(arr) => arr.is_null(idx), + TypedElements::Date32(arr) => arr.is_null(idx), + TypedElements::TimestampMicro(arr) => arr.is_null(idx), + TypedElements::Decimal128(arr, _) => arr.is_null(idx), + TypedElements::String(arr) => arr.is_null(idx), + TypedElements::LargeString(arr) => arr.is_null(idx), + TypedElements::Binary(arr) => arr.is_null(idx), + TypedElements::LargeBinary(arr) => arr.is_null(idx), + TypedElements::Other(arr, _) => arr.is_null(idx), + } + } + + /// Check if this is a fixed-width type (value fits in 8-byte slot). + #[inline] + fn is_fixed_width(&self) -> bool { + match self { + TypedElements::Boolean(_) + | TypedElements::Int8(_) + | TypedElements::Int16(_) + | TypedElements::Int32(_) + | TypedElements::Int64(_) + | TypedElements::Float32(_) + | TypedElements::Float64(_) + | TypedElements::Date32(_) + | TypedElements::TimestampMicro(_) => true, + TypedElements::Decimal128(_, p) => *p <= MAX_LONG_DIGITS, + _ => false, + } + } + + /// Get fixed-width value as i64 for the 8-byte field slot. + #[inline] + fn get_fixed_value(&self, idx: usize) -> i64 { + match self { + TypedElements::Boolean(arr) => { + if arr.value(idx) { + 1 + } else { + 0 + } + } + TypedElements::Int8(arr) => arr.value(idx) as i64, + TypedElements::Int16(arr) => arr.value(idx) as i64, + TypedElements::Int32(arr) => arr.value(idx) as i64, + TypedElements::Int64(arr) => arr.value(idx), + TypedElements::Float32(arr) => (arr.value(idx).to_bits() as i32) as i64, + TypedElements::Float64(arr) => arr.value(idx).to_bits() as i64, + TypedElements::Date32(arr) => arr.value(idx) as i64, + TypedElements::TimestampMicro(arr) => arr.value(idx), + TypedElements::Decimal128(arr, _) => arr.value(idx) as i64, + _ => 0, // Should not be called for variable-length types + } + } + + /// Write variable-length data to buffer. Returns length written (0 for fixed-width). + fn write_variable_value( + &self, + buffer: &mut Vec<u8>, + idx: usize, + base_offset: usize, + ) -> CometResult<usize> { + match self { + TypedElements::String(arr) => { + let bytes = arr.value(idx).as_bytes(); + let len = bytes.len(); + buffer.extend_from_slice(bytes); + let padding = round_up_to_8(len) - len; + buffer.extend(std::iter::repeat_n(0u8, padding)); + Ok(len) + } + TypedElements::LargeString(arr) => { + let bytes = arr.value(idx).as_bytes(); + let len = bytes.len(); + buffer.extend_from_slice(bytes); + let padding = round_up_to_8(len) - len; + buffer.extend(std::iter::repeat_n(0u8, padding)); + Ok(len) + } + TypedElements::Binary(arr) => { + let bytes = arr.value(idx); + let len = bytes.len(); + buffer.extend_from_slice(bytes); + let padding = round_up_to_8(len) - len; + buffer.extend(std::iter::repeat_n(0u8, padding)); + Ok(len) + } + TypedElements::LargeBinary(arr) => { + let bytes = arr.value(idx); + let len = bytes.len(); + buffer.extend_from_slice(bytes); + let padding = round_up_to_8(len) - len; + buffer.extend(std::iter::repeat_n(0u8, padding)); + Ok(len) + } + TypedElements::Decimal128(arr, precision) if *precision > MAX_LONG_DIGITS => { + let bytes = i128_to_spark_decimal_bytes(arr.value(idx)); + let len = bytes.len(); + buffer.extend_from_slice(&bytes); + let padding = round_up_to_8(len) - len; + buffer.extend(std::iter::repeat_n(0u8, padding)); + Ok(len) + } + TypedElements::Other(arr, element_type) => { + write_nested_variable_to_buffer(buffer, element_type, arr, idx, base_offset) + } + _ => Ok(0), // Fixed-width types + } + } + + /// Write a range of elements to buffer in UnsafeArrayData format. + /// Returns the total bytes written (including header). + fn write_range_to_buffer( + &self, + buffer: &mut Vec<u8>, + start_idx: usize, + num_elements: usize, + ) -> CometResult<usize> { + let element_size = self.element_size(); + let array_start = buffer.len(); + let element_bitset_width = ColumnarToRowContext::calculate_bitset_width(num_elements); + + // Write number of elements + buffer.extend_from_slice(&(num_elements as i64).to_le_bytes()); + + // Reserve space for null bitset + let null_bitset_start = buffer.len(); + buffer.resize(null_bitset_start + element_bitset_width, 0); + + // Reserve space for element values + let elements_start = buffer.len(); + let elements_total_size = round_up_to_8(num_elements * element_size); + buffer.resize(elements_start + elements_total_size, 0); + + // Try bulk copy for primitive types + if self.supports_bulk_copy() { + self.bulk_copy_range( + buffer, + null_bitset_start, + elements_start, + start_idx, + num_elements, + ); + return Ok(buffer.len() - array_start); + } + + // Handle other types element by element + self.write_elements_slow( + buffer, + array_start, + null_bitset_start, + elements_start, + element_size, + start_idx, + num_elements, + ) + } + + /// Bulk copy primitive values from a range. + #[inline] + fn bulk_copy_range( + &self, + buffer: &mut [u8], + null_bitset_start: usize, + elements_start: usize, + start_idx: usize, + num_elements: usize, + ) { + macro_rules! bulk_copy_range { + ($arr:expr, $elem_size:expr) => {{ + let values_slice = $arr.values(); + let byte_len = num_elements * $elem_size; + let src_start = start_idx * $elem_size; + let src_bytes = unsafe { + std::slice::from_raw_parts( + (values_slice.as_ptr() as *const u8).add(src_start), + byte_len, + ) + }; + buffer[elements_start..elements_start + byte_len].copy_from_slice(src_bytes); + + // Set null bits + if $arr.null_count() > 0 { + for i in 0..num_elements { + if $arr.is_null(start_idx + i) { + let word_idx = i / 64; + let bit_idx = i % 64; + let word_offset = null_bitset_start + word_idx * 8; + let mut word = i64::from_le_bytes( + buffer[word_offset..word_offset + 8].try_into().unwrap(), + ); + word |= 1i64 << bit_idx; + buffer[word_offset..word_offset + 8] + .copy_from_slice(&word.to_le_bytes()); + } + } + } + }}; + } + + match self { + TypedElements::Int8(arr) => bulk_copy_range!(arr, 1), + TypedElements::Int16(arr) => bulk_copy_range!(arr, 2), + TypedElements::Int32(arr) => bulk_copy_range!(arr, 4), + TypedElements::Int64(arr) => bulk_copy_range!(arr, 8), + TypedElements::Float32(arr) => bulk_copy_range!(arr, 4), + TypedElements::Float64(arr) => bulk_copy_range!(arr, 8), + TypedElements::Date32(arr) => bulk_copy_range!(arr, 4), + TypedElements::TimestampMicro(arr) => bulk_copy_range!(arr, 8), + _ => {} // Should not reach here due to supports_bulk_copy check + } + } + + /// Slow path for non-bulk-copyable types. + #[allow(clippy::too_many_arguments)] + fn write_elements_slow( + &self, + buffer: &mut Vec<u8>, + array_start: usize, + null_bitset_start: usize, + elements_start: usize, + element_size: usize, + start_idx: usize, + num_elements: usize, + ) -> CometResult<usize> { + match self { + TypedElements::Boolean(arr) => { + for i in 0..num_elements { + let src_idx = start_idx + i; + if arr.is_null(src_idx) { Review Comment: Nvm. I was reading this wrong. I misunderstood `arr` to be a `TypedArray` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
