charlesdong1991 commented on code in PR #440: URL: https://github.com/apache/fluss-rust/pull/440#discussion_r2913600085
########## crates/fluss/src/row/column_writer.rs: ########## @@ -0,0 +1,607 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Typed column writers that write directly from [`InternalRow`] to concrete +//! Arrow builders, bypassing the intermediate [`Datum`] enum and runtime +//! `downcast_mut` dispatch. + +use crate::error::Error::RowConvertError; +use crate::error::{Error, Result}; +use crate::metadata::DataType; +use crate::row::InternalRow; +use crate::row::datum::{ + MICROS_PER_MILLI, MILLIS_PER_SECOND, NANOS_PER_MILLI, append_decimal_to_builder, + millis_nanos_to_micros, millis_nanos_to_nanos, +}; +use arrow::array::{ + ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder, + FixedSizeBinaryBuilder, Float32Builder, Float64Builder, Int8Builder, Int16Builder, + Int32Builder, Int64Builder, StringBuilder, Time32MillisecondBuilder, Time32SecondBuilder, + Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampMicrosecondBuilder, + TimestampMillisecondBuilder, TimestampNanosecondBuilder, TimestampSecondBuilder, +}; +use arrow_schema::DataType as ArrowDataType; + +/// Estimated average byte size for variable-width columns (Utf8, Binary). +/// Used to pre-allocate data buffers and avoid reallocations during batch building. +const VARIABLE_WIDTH_AVG_BYTES: usize = 64; + +/// A typed column writer that reads one column from an [`InternalRow`] and +/// appends directly to a concrete Arrow builder — no intermediate [`Datum`], +/// no `as_any_mut().downcast_mut()`. +pub struct ColumnWriter { + pos: usize, + nullable: bool, + inner: TypedWriter, +} + +enum TypedWriter { + Bool(BooleanBuilder), + Int8(Int8Builder), + Int16(Int16Builder), + Int32(Int32Builder), + Int64(Int64Builder), + Float32(Float32Builder), + Float64(Float64Builder), + Char { + len: usize, + builder: StringBuilder, + }, + String(StringBuilder), + Bytes(BinaryBuilder), + Binary { + len: usize, + builder: FixedSizeBinaryBuilder, + }, + Decimal128 { + src_precision: usize, + src_scale: usize, + target_precision: u32, + target_scale: i64, + builder: Decimal128Builder, + }, + Date32(Date32Builder), + Time32Second(Time32SecondBuilder), + Time32Millisecond(Time32MillisecondBuilder), + Time64Microsecond(Time64MicrosecondBuilder), + Time64Nanosecond(Time64NanosecondBuilder), + TimestampNtzSecond { + precision: u32, + builder: TimestampSecondBuilder, + }, + TimestampNtzMillisecond { + precision: u32, + builder: TimestampMillisecondBuilder, + }, + TimestampNtzMicrosecond { + precision: u32, + builder: TimestampMicrosecondBuilder, + }, + TimestampNtzNanosecond { + precision: u32, + builder: TimestampNanosecondBuilder, + }, + TimestampLtzSecond { + precision: u32, + builder: TimestampSecondBuilder, + }, + TimestampLtzMillisecond { + precision: u32, + builder: TimestampMillisecondBuilder, + }, + TimestampLtzMicrosecond { + precision: u32, + builder: TimestampMicrosecondBuilder, + }, + TimestampLtzNanosecond { + precision: u32, + builder: TimestampNanosecondBuilder, + }, +} + +impl ColumnWriter { + /// Create a column writer for the given Fluss `DataType` and Arrow + /// `ArrowDataType` at position `pos` with the given pre-allocation + /// `capacity`. + pub fn create( + fluss_type: &DataType, + arrow_type: &ArrowDataType, + pos: usize, + capacity: usize, + ) -> Result<Self> { + let nullable = fluss_type.is_nullable(); + + let inner = match fluss_type { + DataType::Boolean(_) => TypedWriter::Bool(BooleanBuilder::with_capacity(capacity)), + DataType::TinyInt(_) => TypedWriter::Int8(Int8Builder::with_capacity(capacity)), + DataType::SmallInt(_) => TypedWriter::Int16(Int16Builder::with_capacity(capacity)), + DataType::Int(_) => TypedWriter::Int32(Int32Builder::with_capacity(capacity)), + DataType::BigInt(_) => TypedWriter::Int64(Int64Builder::with_capacity(capacity)), + DataType::Float(_) => TypedWriter::Float32(Float32Builder::with_capacity(capacity)), + DataType::Double(_) => TypedWriter::Float64(Float64Builder::with_capacity(capacity)), + DataType::Char(t) => TypedWriter::Char { + len: t.length() as usize, + builder: StringBuilder::with_capacity( + capacity, + capacity.saturating_mul(VARIABLE_WIDTH_AVG_BYTES), + ), + }, + DataType::String(_) => TypedWriter::String(StringBuilder::with_capacity( + capacity, + capacity.saturating_mul(VARIABLE_WIDTH_AVG_BYTES), + )), + DataType::Bytes(_) => TypedWriter::Bytes(BinaryBuilder::with_capacity( + capacity, + capacity.saturating_mul(VARIABLE_WIDTH_AVG_BYTES), + )), + DataType::Binary(t) => { + let arrow_len: i32 = t.length().try_into().map_err(|_| Error::IllegalArgument { + message: format!( + "Binary length {} exceeds Arrow's maximum (i32::MAX)", + t.length() + ), + })?; + TypedWriter::Binary { + len: t.length(), + builder: FixedSizeBinaryBuilder::with_capacity(capacity, arrow_len), + } + } + DataType::Decimal(dt) => { + let (target_p, target_s) = match arrow_type { + ArrowDataType::Decimal128(p, s) => (*p, *s), + _ => { + return Err(Error::IllegalArgument { + message: format!( + "Expected Decimal128 Arrow type for Decimal, got: {arrow_type:?}" + ), + }); + } + }; + if target_s < 0 { + return Err(Error::IllegalArgument { + message: format!("Negative decimal scale {target_s} is not supported"), + }); + } + let builder = Decimal128Builder::with_capacity(capacity) + .with_precision_and_scale(target_p, target_s) + .map_err(|e| Error::IllegalArgument { + message: format!( + "Invalid decimal precision {target_p} or scale {target_s}: {e}" + ), + })?; + TypedWriter::Decimal128 { + src_precision: dt.precision() as usize, + src_scale: dt.scale() as usize, + target_precision: target_p as u32, + target_scale: target_s as i64, + builder, + } + } + DataType::Date(_) => TypedWriter::Date32(Date32Builder::with_capacity(capacity)), + DataType::Time(_) => match arrow_type { + ArrowDataType::Time32(arrow_schema::TimeUnit::Second) => { + TypedWriter::Time32Second(Time32SecondBuilder::with_capacity(capacity)) + } + ArrowDataType::Time32(arrow_schema::TimeUnit::Millisecond) => { + TypedWriter::Time32Millisecond(Time32MillisecondBuilder::with_capacity( + capacity, + )) + } + ArrowDataType::Time64(arrow_schema::TimeUnit::Microsecond) => { + TypedWriter::Time64Microsecond(Time64MicrosecondBuilder::with_capacity( + capacity, + )) + } + ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond) => { + TypedWriter::Time64Nanosecond(Time64NanosecondBuilder::with_capacity(capacity)) + } + _ => { + return Err(Error::IllegalArgument { + message: format!("Unsupported Arrow type for Time: {arrow_type:?}"), + }); + } + }, + DataType::Timestamp(t) => { + let precision = t.precision(); + match arrow_type { + ArrowDataType::Timestamp(arrow_schema::TimeUnit::Second, _) => { + TypedWriter::TimestampNtzSecond { + precision, + builder: TimestampSecondBuilder::with_capacity(capacity), + } + } + ArrowDataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => { + TypedWriter::TimestampNtzMillisecond { + precision, + builder: TimestampMillisecondBuilder::with_capacity(capacity), + } + } + ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _) => { + TypedWriter::TimestampNtzMicrosecond { + precision, + builder: TimestampMicrosecondBuilder::with_capacity(capacity), + } + } + ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => { + TypedWriter::TimestampNtzNanosecond { + precision, + builder: TimestampNanosecondBuilder::with_capacity(capacity), + } + } + _ => { + return Err(Error::IllegalArgument { + message: format!( + "Unsupported Arrow type for Timestamp: {arrow_type:?}" + ), + }); + } + } + } + DataType::TimestampLTz(t) => { + let precision = t.precision(); + match arrow_type { + ArrowDataType::Timestamp(arrow_schema::TimeUnit::Second, _) => { + TypedWriter::TimestampLtzSecond { + precision, + builder: TimestampSecondBuilder::with_capacity(capacity), + } + } + ArrowDataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => { + TypedWriter::TimestampLtzMillisecond { + precision, + builder: TimestampMillisecondBuilder::with_capacity(capacity), + } + } + ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _) => { + TypedWriter::TimestampLtzMicrosecond { + precision, + builder: TimestampMicrosecondBuilder::with_capacity(capacity), + } + } + ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => { + TypedWriter::TimestampLtzNanosecond { + precision, + builder: TimestampNanosecondBuilder::with_capacity(capacity), + } + } + _ => { + return Err(Error::IllegalArgument { + message: format!( + "Unsupported Arrow type for TimestampLTz: {arrow_type:?}" + ), + }); + } + } + } + _ => { + return Err(Error::IllegalArgument { + message: format!("Unsupported Fluss DataType: {fluss_type:?}"), + }); + } + }; + + Ok(Self { + pos, + nullable, + inner, + }) + } + + /// Read one value from `row` at this writer's column position and append it + /// directly to the concrete Arrow builder. + #[inline] + pub fn write_field(&mut self, row: &dyn InternalRow) -> Result<()> { + if self.nullable && row.is_null_at(self.pos)? { + self.append_null(); + return Ok(()); + } + self.write_non_null(row) + } + + /// Finish the builder, producing the final Arrow array. + pub fn finish(&mut self) -> ArrayRef { + self.as_builder_mut().finish() + } + + /// Clone-finish the builder for size estimation (does not reset the builder). + pub fn finish_cloned(&self) -> ArrayRef { + self.as_builder_ref().finish_cloned() + } + + fn append_null(&mut self) { + // Every concrete Arrow builder has `append_null()` but it is NOT on the + // `ArrayBuilder` trait, so we dispatch through the enum. Exhaustive + // matching ensures new variants won't compile without a null arm. + macro_rules! null { + ($b:expr) => { + $b.append_null() + }; + } + match &mut self.inner { + TypedWriter::Bool(b) => null!(b), + TypedWriter::Int8(b) => null!(b), + TypedWriter::Int16(b) => null!(b), + TypedWriter::Int32(b) => null!(b), + TypedWriter::Int64(b) => null!(b), + TypedWriter::Float32(b) => null!(b), + TypedWriter::Float64(b) => null!(b), + TypedWriter::Char { builder, .. } => null!(builder), + TypedWriter::String(b) => null!(b), + TypedWriter::Bytes(b) => null!(b), + TypedWriter::Binary { builder, .. } => null!(builder), + TypedWriter::Decimal128 { builder, .. } => null!(builder), + TypedWriter::Date32(b) => null!(b), + TypedWriter::Time32Second(b) => null!(b), + TypedWriter::Time32Millisecond(b) => null!(b), + TypedWriter::Time64Microsecond(b) => null!(b), + TypedWriter::Time64Nanosecond(b) => null!(b), + TypedWriter::TimestampNtzSecond { builder, .. } => null!(builder), + TypedWriter::TimestampNtzMillisecond { builder, .. } => null!(builder), + TypedWriter::TimestampNtzMicrosecond { builder, .. } => null!(builder), + TypedWriter::TimestampNtzNanosecond { builder, .. } => null!(builder), + TypedWriter::TimestampLtzSecond { builder, .. } => null!(builder), + TypedWriter::TimestampLtzMillisecond { builder, .. } => null!(builder), + TypedWriter::TimestampLtzMicrosecond { builder, .. } => null!(builder), + TypedWriter::TimestampLtzNanosecond { builder, .. } => null!(builder), + } + } + + /// Returns a trait-object reference to the inner builder. + /// Used for type-agnostic operations (`finish`, `finish_cloned`). + fn as_builder_mut(&mut self) -> &mut dyn ArrayBuilder { + match &mut self.inner { + TypedWriter::Bool(b) => b, + TypedWriter::Int8(b) => b, + TypedWriter::Int16(b) => b, + TypedWriter::Int32(b) => b, + TypedWriter::Int64(b) => b, + TypedWriter::Float32(b) => b, + TypedWriter::Float64(b) => b, + TypedWriter::Char { builder, .. } => builder, + TypedWriter::String(b) => b, + TypedWriter::Bytes(b) => b, + TypedWriter::Binary { builder, .. } => builder, + TypedWriter::Decimal128 { builder, .. } => builder, + TypedWriter::Date32(b) => b, + TypedWriter::Time32Second(b) => b, + TypedWriter::Time32Millisecond(b) => b, + TypedWriter::Time64Microsecond(b) => b, + TypedWriter::Time64Nanosecond(b) => b, + TypedWriter::TimestampNtzSecond { builder, .. } => builder, + TypedWriter::TimestampNtzMillisecond { builder, .. } => builder, + TypedWriter::TimestampNtzMicrosecond { builder, .. } => builder, + TypedWriter::TimestampNtzNanosecond { builder, .. } => builder, + TypedWriter::TimestampLtzSecond { builder, .. } => builder, + TypedWriter::TimestampLtzMillisecond { builder, .. } => builder, + TypedWriter::TimestampLtzMicrosecond { builder, .. } => builder, + TypedWriter::TimestampLtzNanosecond { builder, .. } => builder, + } + } + + fn as_builder_ref(&self) -> &dyn ArrayBuilder { + match &self.inner { + TypedWriter::Bool(b) => b, + TypedWriter::Int8(b) => b, + TypedWriter::Int16(b) => b, + TypedWriter::Int32(b) => b, + TypedWriter::Int64(b) => b, + TypedWriter::Float32(b) => b, + TypedWriter::Float64(b) => b, + TypedWriter::Char { builder, .. } => builder, + TypedWriter::String(b) => b, + TypedWriter::Bytes(b) => b, + TypedWriter::Binary { builder, .. } => builder, + TypedWriter::Decimal128 { builder, .. } => builder, + TypedWriter::Date32(b) => b, + TypedWriter::Time32Second(b) => b, + TypedWriter::Time32Millisecond(b) => b, + TypedWriter::Time64Microsecond(b) => b, + TypedWriter::Time64Nanosecond(b) => b, + TypedWriter::TimestampNtzSecond { builder, .. } => builder, + TypedWriter::TimestampNtzMillisecond { builder, .. } => builder, + TypedWriter::TimestampNtzMicrosecond { builder, .. } => builder, + TypedWriter::TimestampNtzNanosecond { builder, .. } => builder, + TypedWriter::TimestampLtzSecond { builder, .. } => builder, + TypedWriter::TimestampLtzMillisecond { builder, .. } => builder, + TypedWriter::TimestampLtzMicrosecond { builder, .. } => builder, + TypedWriter::TimestampLtzNanosecond { builder, .. } => builder, + } + } + + #[inline] + fn write_non_null(&mut self, row: &dyn InternalRow) -> Result<()> { + let pos = self.pos; + + match &mut self.inner { + TypedWriter::Bool(b) => { + b.append_value(row.get_boolean(pos)?); + Ok(()) + } + TypedWriter::Int8(b) => { + b.append_value(row.get_byte(pos)?); + Ok(()) + } + TypedWriter::Int16(b) => { + b.append_value(row.get_short(pos)?); + Ok(()) + } + TypedWriter::Int32(b) => { + b.append_value(row.get_int(pos)?); + Ok(()) + } + TypedWriter::Int64(b) => { + b.append_value(row.get_long(pos)?); + Ok(()) + } + TypedWriter::Float32(b) => { + b.append_value(row.get_float(pos)?); + Ok(()) + } + TypedWriter::Float64(b) => { + b.append_value(row.get_double(pos)?); + Ok(()) + } + TypedWriter::Char { len, builder } => { + let v = row.get_char(pos, *len)?; + builder.append_value(v); + Ok(()) + } + TypedWriter::String(b) => { + let v = row.get_string(pos)?; + b.append_value(v); + Ok(()) + } + TypedWriter::Bytes(b) => { + let v = row.get_bytes(pos)?; + b.append_value(v); + Ok(()) + } + TypedWriter::Binary { len, builder } => { + let v = row.get_binary(pos, *len)?; + builder.append_value(v).map_err(|e| RowConvertError { + message: format!("Failed to append binary value: {e}"), + })?; + Ok(()) + } + TypedWriter::Decimal128 { + src_precision, + src_scale, + target_precision, + target_scale, + builder, + } => { + let decimal = row.get_decimal(pos, *src_precision, *src_scale)?; + append_decimal_to_builder(&decimal, *target_precision, *target_scale, builder) + } + TypedWriter::Date32(b) => { + let date = row.get_date(pos)?; + b.append_value(date.get_inner()); + Ok(()) + } + TypedWriter::Time32Second(b) => { + let millis = row.get_time(pos)?.get_inner(); + if millis % MILLIS_PER_SECOND as i32 != 0 { + return Err(RowConvertError { + message: format!( + "Time value {millis} ms has sub-second precision but schema expects seconds only" + ), + }); + } + b.append_value(millis / MILLIS_PER_SECOND as i32); + Ok(()) + } + TypedWriter::Time32Millisecond(b) => { + b.append_value(row.get_time(pos)?.get_inner()); + Ok(()) + } + TypedWriter::Time64Microsecond(b) => { + let millis = row.get_time(pos)?.get_inner(); + let micros = (millis as i64) + .checked_mul(MICROS_PER_MILLI) + .ok_or_else(|| RowConvertError { + message: format!( + "Time value {millis} ms overflows when converting to microseconds" + ), + })?; + b.append_value(micros); + Ok(()) + } + TypedWriter::Time64Nanosecond(b) => { + let millis = row.get_time(pos)?.get_inner(); + let nanos = (millis as i64) + .checked_mul(NANOS_PER_MILLI) + .ok_or_else(|| RowConvertError { + message: format!( + "Time value {millis} ms overflows when converting to nanoseconds" + ), + })?; + b.append_value(nanos); + Ok(()) + } + // --- TimestampNtz variants --- + TypedWriter::TimestampNtzSecond { + precision, builder, .. + } => { + let ts = row.get_timestamp_ntz(pos, *precision)?; + builder.append_value(ts.get_millisecond() / MILLIS_PER_SECOND); + Ok(()) + } + TypedWriter::TimestampNtzMillisecond { + precision, builder, .. + } => { + let ts = row.get_timestamp_ntz(pos, *precision)?; + builder.append_value(ts.get_millisecond()); + Ok(()) + } + TypedWriter::TimestampNtzMicrosecond { + precision, builder, .. + } => { + let ts = row.get_timestamp_ntz(pos, *precision)?; + builder.append_value(millis_nanos_to_micros( + ts.get_millisecond(), + ts.get_nano_of_millisecond(), + )?); + Ok(()) + } + TypedWriter::TimestampNtzNanosecond { + precision, builder, .. + } => { + let ts = row.get_timestamp_ntz(pos, *precision)?; + builder.append_value(millis_nanos_to_nanos( + ts.get_millisecond(), + ts.get_nano_of_millisecond(), + )?); + Ok(()) + } + // --- TimestampLtz variants --- + TypedWriter::TimestampLtzSecond { + precision, builder, .. + } => { + let ts = row.get_timestamp_ltz(pos, *precision)?; + builder.append_value(ts.get_epoch_millisecond() / MILLIS_PER_SECOND); + Ok(()) + } + TypedWriter::TimestampLtzMillisecond { + precision, builder, .. + } => { + let ts = row.get_timestamp_ltz(pos, *precision)?; + builder.append_value(ts.get_epoch_millisecond()); + Ok(()) + } + TypedWriter::TimestampLtzMicrosecond { + precision, builder, .. + } => { + let ts = row.get_timestamp_ltz(pos, *precision)?; + builder.append_value(millis_nanos_to_micros( + ts.get_epoch_millisecond(), + ts.get_nano_of_millisecond(), + )?); + Ok(()) + } + TypedWriter::TimestampLtzNanosecond { + precision, builder, .. + } => { + let ts = row.get_timestamp_ltz(pos, *precision)?; + builder.append_value(millis_nanos_to_nanos( + ts.get_epoch_millisecond(), + ts.get_nano_of_millisecond(), + )?); + Ok(()) + } + } + } +} Review Comment: Nit: i wonder if we want to have dedicated tests for column writer? now we do coverage through batch builder, but having dedicated ones can make regression easier to spot ########## crates/fluss/src/row/column_writer.rs: ########## @@ -0,0 +1,607 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Typed column writers that write directly from [`InternalRow`] to concrete +//! Arrow builders, bypassing the intermediate [`Datum`] enum and runtime +//! `downcast_mut` dispatch. + +use crate::error::Error::RowConvertError; +use crate::error::{Error, Result}; +use crate::metadata::DataType; +use crate::row::InternalRow; +use crate::row::datum::{ + MICROS_PER_MILLI, MILLIS_PER_SECOND, NANOS_PER_MILLI, append_decimal_to_builder, + millis_nanos_to_micros, millis_nanos_to_nanos, +}; +use arrow::array::{ + ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder, + FixedSizeBinaryBuilder, Float32Builder, Float64Builder, Int8Builder, Int16Builder, + Int32Builder, Int64Builder, StringBuilder, Time32MillisecondBuilder, Time32SecondBuilder, + Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampMicrosecondBuilder, + TimestampMillisecondBuilder, TimestampNanosecondBuilder, TimestampSecondBuilder, +}; +use arrow_schema::DataType as ArrowDataType; + +/// Estimated average byte size for variable-width columns (Utf8, Binary). +/// Used to pre-allocate data buffers and avoid reallocations during batch building. +const VARIABLE_WIDTH_AVG_BYTES: usize = 64; + +/// A typed column writer that reads one column from an [`InternalRow`] and +/// appends directly to a concrete Arrow builder — no intermediate [`Datum`], +/// no `as_any_mut().downcast_mut()`. +pub struct ColumnWriter { + pos: usize, + nullable: bool, + inner: TypedWriter, +} + +enum TypedWriter { + Bool(BooleanBuilder), + Int8(Int8Builder), + Int16(Int16Builder), + Int32(Int32Builder), + Int64(Int64Builder), + Float32(Float32Builder), + Float64(Float64Builder), + Char { + len: usize, + builder: StringBuilder, + }, + String(StringBuilder), + Bytes(BinaryBuilder), + Binary { + len: usize, + builder: FixedSizeBinaryBuilder, + }, + Decimal128 { + src_precision: usize, + src_scale: usize, + target_precision: u32, + target_scale: i64, + builder: Decimal128Builder, + }, + Date32(Date32Builder), + Time32Second(Time32SecondBuilder), + Time32Millisecond(Time32MillisecondBuilder), + Time64Microsecond(Time64MicrosecondBuilder), + Time64Nanosecond(Time64NanosecondBuilder), + TimestampNtzSecond { + precision: u32, + builder: TimestampSecondBuilder, + }, + TimestampNtzMillisecond { + precision: u32, + builder: TimestampMillisecondBuilder, + }, + TimestampNtzMicrosecond { + precision: u32, + builder: TimestampMicrosecondBuilder, + }, + TimestampNtzNanosecond { + precision: u32, + builder: TimestampNanosecondBuilder, + }, + TimestampLtzSecond { + precision: u32, + builder: TimestampSecondBuilder, + }, + TimestampLtzMillisecond { + precision: u32, + builder: TimestampMillisecondBuilder, + }, + TimestampLtzMicrosecond { + precision: u32, + builder: TimestampMicrosecondBuilder, + }, + TimestampLtzNanosecond { + precision: u32, + builder: TimestampNanosecondBuilder, + }, +} + +impl ColumnWriter { + /// Create a column writer for the given Fluss `DataType` and Arrow + /// `ArrowDataType` at position `pos` with the given pre-allocation + /// `capacity`. + pub fn create( + fluss_type: &DataType, + arrow_type: &ArrowDataType, + pos: usize, + capacity: usize, + ) -> Result<Self> { + let nullable = fluss_type.is_nullable(); + + let inner = match fluss_type { + DataType::Boolean(_) => TypedWriter::Bool(BooleanBuilder::with_capacity(capacity)), + DataType::TinyInt(_) => TypedWriter::Int8(Int8Builder::with_capacity(capacity)), + DataType::SmallInt(_) => TypedWriter::Int16(Int16Builder::with_capacity(capacity)), + DataType::Int(_) => TypedWriter::Int32(Int32Builder::with_capacity(capacity)), + DataType::BigInt(_) => TypedWriter::Int64(Int64Builder::with_capacity(capacity)), + DataType::Float(_) => TypedWriter::Float32(Float32Builder::with_capacity(capacity)), + DataType::Double(_) => TypedWriter::Float64(Float64Builder::with_capacity(capacity)), + DataType::Char(t) => TypedWriter::Char { + len: t.length() as usize, + builder: StringBuilder::with_capacity( + capacity, + capacity.saturating_mul(VARIABLE_WIDTH_AVG_BYTES), + ), + }, + DataType::String(_) => TypedWriter::String(StringBuilder::with_capacity( + capacity, + capacity.saturating_mul(VARIABLE_WIDTH_AVG_BYTES), + )), + DataType::Bytes(_) => TypedWriter::Bytes(BinaryBuilder::with_capacity( + capacity, + capacity.saturating_mul(VARIABLE_WIDTH_AVG_BYTES), + )), + DataType::Binary(t) => { + let arrow_len: i32 = t.length().try_into().map_err(|_| Error::IllegalArgument { + message: format!( + "Binary length {} exceeds Arrow's maximum (i32::MAX)", + t.length() + ), + })?; + TypedWriter::Binary { + len: t.length(), + builder: FixedSizeBinaryBuilder::with_capacity(capacity, arrow_len), + } + } + DataType::Decimal(dt) => { + let (target_p, target_s) = match arrow_type { + ArrowDataType::Decimal128(p, s) => (*p, *s), + _ => { + return Err(Error::IllegalArgument { + message: format!( + "Expected Decimal128 Arrow type for Decimal, got: {arrow_type:?}" + ), + }); + } + }; + if target_s < 0 { + return Err(Error::IllegalArgument { + message: format!("Negative decimal scale {target_s} is not supported"), + }); + } + let builder = Decimal128Builder::with_capacity(capacity) + .with_precision_and_scale(target_p, target_s) + .map_err(|e| Error::IllegalArgument { + message: format!( + "Invalid decimal precision {target_p} or scale {target_s}: {e}" + ), + })?; + TypedWriter::Decimal128 { + src_precision: dt.precision() as usize, + src_scale: dt.scale() as usize, + target_precision: target_p as u32, + target_scale: target_s as i64, + builder, + } + } + DataType::Date(_) => TypedWriter::Date32(Date32Builder::with_capacity(capacity)), + DataType::Time(_) => match arrow_type { + ArrowDataType::Time32(arrow_schema::TimeUnit::Second) => { + TypedWriter::Time32Second(Time32SecondBuilder::with_capacity(capacity)) + } + ArrowDataType::Time32(arrow_schema::TimeUnit::Millisecond) => { + TypedWriter::Time32Millisecond(Time32MillisecondBuilder::with_capacity( + capacity, + )) + } + ArrowDataType::Time64(arrow_schema::TimeUnit::Microsecond) => { + TypedWriter::Time64Microsecond(Time64MicrosecondBuilder::with_capacity( + capacity, + )) + } + ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond) => { + TypedWriter::Time64Nanosecond(Time64NanosecondBuilder::with_capacity(capacity)) + } + _ => { + return Err(Error::IllegalArgument { + message: format!("Unsupported Arrow type for Time: {arrow_type:?}"), + }); + } + }, + DataType::Timestamp(t) => { + let precision = t.precision(); + match arrow_type { + ArrowDataType::Timestamp(arrow_schema::TimeUnit::Second, _) => { + TypedWriter::TimestampNtzSecond { + precision, + builder: TimestampSecondBuilder::with_capacity(capacity), + } + } + ArrowDataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => { + TypedWriter::TimestampNtzMillisecond { + precision, + builder: TimestampMillisecondBuilder::with_capacity(capacity), + } + } + ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _) => { + TypedWriter::TimestampNtzMicrosecond { + precision, + builder: TimestampMicrosecondBuilder::with_capacity(capacity), + } + } + ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => { + TypedWriter::TimestampNtzNanosecond { + precision, + builder: TimestampNanosecondBuilder::with_capacity(capacity), + } + } + _ => { + return Err(Error::IllegalArgument { + message: format!( + "Unsupported Arrow type for Timestamp: {arrow_type:?}" + ), + }); + } + } + } + DataType::TimestampLTz(t) => { + let precision = t.precision(); + match arrow_type { + ArrowDataType::Timestamp(arrow_schema::TimeUnit::Second, _) => { + TypedWriter::TimestampLtzSecond { + precision, + builder: TimestampSecondBuilder::with_capacity(capacity), + } + } + ArrowDataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => { + TypedWriter::TimestampLtzMillisecond { + precision, + builder: TimestampMillisecondBuilder::with_capacity(capacity), + } + } + ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _) => { + TypedWriter::TimestampLtzMicrosecond { + precision, + builder: TimestampMicrosecondBuilder::with_capacity(capacity), + } + } + ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => { + TypedWriter::TimestampLtzNanosecond { + precision, + builder: TimestampNanosecondBuilder::with_capacity(capacity), + } + } + _ => { + return Err(Error::IllegalArgument { + message: format!( + "Unsupported Arrow type for TimestampLTz: {arrow_type:?}" + ), + }); + } + } + } + _ => { + return Err(Error::IllegalArgument { + message: format!("Unsupported Fluss DataType: {fluss_type:?}"), + }); + } + }; + + Ok(Self { + pos, + nullable, + inner, + }) + } + + /// Read one value from `row` at this writer's column position and append it + /// directly to the concrete Arrow builder. + #[inline] + pub fn write_field(&mut self, row: &dyn InternalRow) -> Result<()> { + if self.nullable && row.is_null_at(self.pos)? { + self.append_null(); + return Ok(()); + } + self.write_non_null(row) + } + + /// Finish the builder, producing the final Arrow array. + pub fn finish(&mut self) -> ArrayRef { + self.as_builder_mut().finish() + } + + /// Clone-finish the builder for size estimation (does not reset the builder). + pub fn finish_cloned(&self) -> ArrayRef { + self.as_builder_ref().finish_cloned() + } + + fn append_null(&mut self) { + // Every concrete Arrow builder has `append_null()` but it is NOT on the + // `ArrayBuilder` trait, so we dispatch through the enum. Exhaustive + // matching ensures new variants won't compile without a null arm. + macro_rules! null { + ($b:expr) => { + $b.append_null() + }; + } + match &mut self.inner { + TypedWriter::Bool(b) => null!(b), Review Comment: Nit: i wonder if we can use macro to avoid code repetition, seems we have it in 3 places? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
