fresh-borzoni commented on code in PR #440: URL: https://github.com/apache/fluss-rust/pull/440#discussion_r2909061074
########## crates/fluss/src/row/column_writer.rs: ########## @@ -0,0 +1,607 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Typed column writers that write directly from [`InternalRow`] to concrete +//! Arrow builders, bypassing the intermediate [`Datum`] enum and runtime +//! `downcast_mut` dispatch. + +use crate::error::Error::RowConvertError; +use crate::error::{Error, Result}; +use crate::metadata::DataType; +use crate::row::InternalRow; +use crate::row::datum::{ + MICROS_PER_MILLI, MILLIS_PER_SECOND, NANOS_PER_MILLI, append_decimal_to_builder, + millis_nanos_to_micros, millis_nanos_to_nanos, +}; +use arrow::array::{ + ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder, + FixedSizeBinaryBuilder, Float32Builder, Float64Builder, Int8Builder, Int16Builder, + Int32Builder, Int64Builder, StringBuilder, Time32MillisecondBuilder, Time32SecondBuilder, + Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampMicrosecondBuilder, + TimestampMillisecondBuilder, TimestampNanosecondBuilder, TimestampSecondBuilder, +}; +use arrow_schema::DataType as ArrowDataType; + +/// Estimated average byte size for variable-width columns (Utf8, Binary). +/// Used to pre-allocate data buffers and avoid reallocations during batch building. +const VARIABLE_WIDTH_AVG_BYTES: usize = 64; + +/// A typed column writer that reads one column from an [`InternalRow`] and +/// appends directly to a concrete Arrow builder — no intermediate [`Datum`], +/// no `as_any_mut().downcast_mut()`. +pub struct ColumnWriter { + pos: usize, + nullable: bool, + inner: TypedWriter, +} + +enum TypedWriter { + Bool(BooleanBuilder), + Int8(Int8Builder), + Int16(Int16Builder), + Int32(Int32Builder), + Int64(Int64Builder), + Float32(Float32Builder), + Float64(Float64Builder), + Char { + len: usize, + builder: StringBuilder, + }, + String(StringBuilder), + Bytes(BinaryBuilder), + Binary { + len: usize, + builder: FixedSizeBinaryBuilder, + }, + Decimal128 { + src_precision: usize, + src_scale: usize, + target_precision: u32, + target_scale: i64, + builder: Decimal128Builder, + }, + Date32(Date32Builder), + Time32Second(Time32SecondBuilder), + Time32Millisecond(Time32MillisecondBuilder), + Time64Microsecond(Time64MicrosecondBuilder), + Time64Nanosecond(Time64NanosecondBuilder), + TimestampNtzSecond { + precision: u32, + builder: TimestampSecondBuilder, + }, + TimestampNtzMillisecond { + precision: u32, + builder: TimestampMillisecondBuilder, + }, + TimestampNtzMicrosecond { + precision: u32, + builder: TimestampMicrosecondBuilder, + }, + TimestampNtzNanosecond { + precision: u32, + builder: TimestampNanosecondBuilder, + }, + TimestampLtzSecond { + precision: u32, + builder: TimestampSecondBuilder, + }, + TimestampLtzMillisecond { + precision: u32, + builder: TimestampMillisecondBuilder, + }, + TimestampLtzMicrosecond { + precision: u32, + builder: TimestampMicrosecondBuilder, + }, + TimestampLtzNanosecond { + precision: u32, + builder: TimestampNanosecondBuilder, + }, +} + +impl ColumnWriter { + /// Create a column writer for the given Fluss `DataType` and Arrow + /// `ArrowDataType` at position `pos` with the given pre-allocation + /// `capacity`. + pub fn create( + fluss_type: &DataType, + arrow_type: &ArrowDataType, + pos: usize, + capacity: usize, + ) -> Result<Self> { + let nullable = fluss_type.is_nullable(); + + let inner = match fluss_type { + DataType::Boolean(_) => TypedWriter::Bool(BooleanBuilder::with_capacity(capacity)), + DataType::TinyInt(_) => TypedWriter::Int8(Int8Builder::with_capacity(capacity)), + DataType::SmallInt(_) => TypedWriter::Int16(Int16Builder::with_capacity(capacity)), + DataType::Int(_) => TypedWriter::Int32(Int32Builder::with_capacity(capacity)), + DataType::BigInt(_) => TypedWriter::Int64(Int64Builder::with_capacity(capacity)), + DataType::Float(_) => TypedWriter::Float32(Float32Builder::with_capacity(capacity)), + DataType::Double(_) => TypedWriter::Float64(Float64Builder::with_capacity(capacity)), + DataType::Char(t) => TypedWriter::Char { + len: t.length() as usize, + builder: StringBuilder::with_capacity( + capacity, + capacity * VARIABLE_WIDTH_AVG_BYTES, + ), + }, + DataType::String(_) => TypedWriter::String(StringBuilder::with_capacity( + capacity, + capacity * VARIABLE_WIDTH_AVG_BYTES, + )), + DataType::Bytes(_) => TypedWriter::Bytes(BinaryBuilder::with_capacity( + capacity, + capacity * VARIABLE_WIDTH_AVG_BYTES, + )), + DataType::Binary(t) => { + let arrow_len: i32 = t.length().try_into().map_err(|_| Error::IllegalArgument { + message: format!( + "Binary length {} exceeds Arrow's maximum (i32::MAX)", + t.length() + ), + })?; + TypedWriter::Binary { + len: t.length(), + builder: FixedSizeBinaryBuilder::with_capacity(capacity, arrow_len), + } + } + DataType::Decimal(dt) => { + let (target_p, target_s) = match arrow_type { + ArrowDataType::Decimal128(p, s) => (*p, *s), + _ => { + return Err(Error::IllegalArgument { + message: format!( + "Expected Decimal128 Arrow type for Decimal, got: {arrow_type:?}" + ), + }); + } + }; + if target_s < 0 { + return Err(Error::IllegalArgument { + message: format!("Negative decimal scale {target_s} is not supported"), + }); + } + let builder = Decimal128Builder::with_capacity(capacity) + .with_precision_and_scale(target_p, target_s) + .map_err(|e| Error::IllegalArgument { + message: format!( + "Invalid decimal precision {target_p} or scale {target_s}: {e}" + ), + })?; + TypedWriter::Decimal128 { + src_precision: dt.precision() as usize, + src_scale: dt.scale() as usize, + target_precision: target_p as u32, + target_scale: target_s as i64, + builder, + } + } + DataType::Date(_) => TypedWriter::Date32(Date32Builder::with_capacity(capacity)), + DataType::Time(_) => match arrow_type { + ArrowDataType::Time32(arrow_schema::TimeUnit::Second) => { + TypedWriter::Time32Second(Time32SecondBuilder::with_capacity(capacity)) + } + ArrowDataType::Time32(arrow_schema::TimeUnit::Millisecond) => { + TypedWriter::Time32Millisecond(Time32MillisecondBuilder::with_capacity( + capacity, + )) + } + ArrowDataType::Time64(arrow_schema::TimeUnit::Microsecond) => { + TypedWriter::Time64Microsecond(Time64MicrosecondBuilder::with_capacity( + capacity, + )) + } + ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond) => { + TypedWriter::Time64Nanosecond(Time64NanosecondBuilder::with_capacity(capacity)) + } + _ => { + return Err(Error::IllegalArgument { + message: format!("Unsupported Arrow type for Time: {arrow_type:?}"), + }); + } + }, + DataType::Timestamp(t) => { + let precision = t.precision(); + match arrow_type { + ArrowDataType::Timestamp(arrow_schema::TimeUnit::Second, _) => { + TypedWriter::TimestampNtzSecond { + precision, + builder: TimestampSecondBuilder::with_capacity(capacity), + } + } + ArrowDataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => { + TypedWriter::TimestampNtzMillisecond { + precision, Review Comment: Both NTZ and LTZ already map to Timestamp(unit, None) in to_arrow_type(), so the wildcard only ever matches None -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
