luoyuxia commented on code in PR #196:
URL: https://github.com/apache/fluss-rust/pull/196#discussion_r2718975466
##########
crates/fluss/src/record/arrow.rs:
##########
@@ -172,61 +175,129 @@ pub struct RowAppendRecordBatchBuilder {
}
impl RowAppendRecordBatchBuilder {
- pub fn new(row_type: &RowType) -> Self {
- let schema_ref = to_arrow_schema(row_type);
- let builders = Mutex::new(
- schema_ref
- .fields()
- .iter()
- .map(|field| Self::create_builder(field.data_type()))
- .collect(),
- );
- Self {
+ pub fn new(row_type: &RowType) -> Result<Self> {
+ let schema_ref = to_arrow_schema(row_type)?;
+ let builders: Result<Vec<_>> = schema_ref
+ .fields()
+ .iter()
+ .map(|field| Self::create_builder(field.data_type()))
+ .collect();
+ Ok(Self {
table_schema: schema_ref.clone(),
- arrow_column_builders: builders,
+ arrow_column_builders: Mutex::new(builders?),
Review Comment:
I think we can remove the `Mutex` now since we already make
`ArrowLogWriteBatch#build` mut
##########
crates/fluss/src/row/datum.rs:
##########
@@ -298,16 +450,297 @@ impl Datum<'_> {
Datum::Float64(v) => append_value_to_arrow!(Float64Builder,
v.into_inner()),
Datum::String(v) => append_value_to_arrow!(StringBuilder,
v.as_ref()),
Datum::Blob(v) => append_value_to_arrow!(BinaryBuilder,
v.as_ref()),
- Datum::Decimal(_)
- | Datum::Date(_)
- | Datum::Time(_)
- | Datum::TimestampNtz(_)
- | Datum::TimestampLtz(_) => {
+ Datum::Decimal(decimal) => {
+ // Extract target precision and scale from Arrow schema
+ let (p, s) = match data_type {
+ arrow_schema::DataType::Decimal128(p, s) => (*p, *s),
+ _ => {
+ return Err(RowConvertError {
+ message: format!(
+ "Expected Decimal128 Arrow type, got: {:?}",
+ data_type
+ ),
+ });
+ }
+ };
+
+ // Validate scale is non-negative (Fluss doesn't support
negative scales)
+ if s < 0 {
+ return Err(RowConvertError {
+ message: format!("Negative decimal scale {} is not
supported", s),
+ });
+ }
+
+ let target_precision = p as u32;
+ let target_scale = s as i64; // Safe now: 0..127 → 0i64..127i64
+
+ if let Some(b) =
builder.as_any_mut().downcast_mut::<Decimal128Builder>() {
+ use bigdecimal::RoundingMode;
+
+ // Rescale the decimal to match Arrow's target scale
+ let bd = decimal.to_big_decimal();
+ let rescaled = bd.with_scale_round(target_scale,
RoundingMode::HalfUp);
+ let (unscaled, _) = rescaled.as_bigint_and_exponent();
+
+ // Validate precision
+ let actual_precision =
Decimal::compute_precision(&unscaled);
+ if actual_precision > target_precision as usize {
+ return Err(RowConvertError {
+ message: format!(
+ "Decimal precision overflow: value has {}
digits but Arrow expects {} (value: {})",
+ actual_precision, target_precision, rescaled
+ ),
+ });
+ }
+
+ // Convert to i128 for Arrow
+ let i128_val: i128 = match unscaled.try_into() {
+ Ok(v) => v,
+ Err(_) => {
+ return Err(RowConvertError {
+ message: format!("Decimal value exceeds i128
range: {}", rescaled),
+ });
+ }
+ };
+
+ b.append_value(i128_val);
+ return Ok(());
+ }
+
+ return Err(RowConvertError {
+ message: "Builder type mismatch for
Decimal128".to_string(),
+ });
+ }
+ Datum::Date(date) => {
+ append_value_to_arrow!(Date32Builder, date.get_inner());
+ }
+ Datum::Time(time) => {
+ // Time is stored as milliseconds since midnight in Fluss
+ // Convert to Arrow's time unit based on schema
+ let millis = time.get_inner();
+
+ match data_type {
+
arrow_schema::DataType::Time32(arrow_schema::TimeUnit::Second) => {
+ if let Some(b) =
builder.as_any_mut().downcast_mut::<Time32SecondBuilder>()
+ {
+ // Validate no sub-second precision is lost
+ if millis % MILLIS_PER_SECOND as i32 != 0 {
+ return Err(RowConvertError {
+ message: format!(
+ "Time value {} ms has sub-second
precision but schema expects seconds only",
+ millis
+ ),
+ });
+ }
+ b.append_value(millis / MILLIS_PER_SECOND as i32);
+ return Ok(());
+ }
+ }
+
arrow_schema::DataType::Time32(arrow_schema::TimeUnit::Millisecond) => {
+ if let Some(b) = builder
+ .as_any_mut()
+ .downcast_mut::<Time32MillisecondBuilder>()
+ {
+ b.append_value(millis);
+ return Ok(());
+ }
+ }
+
arrow_schema::DataType::Time64(arrow_schema::TimeUnit::Microsecond) => {
+ if let Some(b) = builder
+ .as_any_mut()
+ .downcast_mut::<Time64MicrosecondBuilder>()
+ {
+ let micros = (millis as i64)
+ .checked_mul(MICROS_PER_MILLI)
+ .ok_or_else(|| RowConvertError {
+ message: format!(
+ "Time value {} ms overflows when
converting to microseconds",
+ millis
+ ),
+ })?;
+ b.append_value(micros);
+ return Ok(());
+ }
+ }
+
arrow_schema::DataType::Time64(arrow_schema::TimeUnit::Nanosecond) => {
+ if let Some(b) = builder
+ .as_any_mut()
+ .downcast_mut::<Time64NanosecondBuilder>()
+ {
+ let nanos = (millis as
i64).checked_mul(NANOS_PER_MILLI).ok_or_else(
+ || RowConvertError {
+ message: format!(
+ "Time value {} ms overflows when
converting to nanoseconds",
+ millis
+ ),
+ },
+ )?;
+ b.append_value(nanos);
+ return Ok(());
+ }
+ }
+ _ => {
+ return Err(RowConvertError {
+ message: format!(
+ "Expected Time32/Time64 Arrow type, got: {:?}",
+ data_type
+ ),
+ });
+ }
+ }
+
return Err(RowConvertError {
- message: format!(
- "Type {:?} is not yet supported for Arrow conversion",
- std::mem::discriminant(self)
- ),
+ message: "Builder type mismatch for Time".to_string(),
+ });
+ }
+ Datum::TimestampNtz(ts) => {
+ let millis = ts.get_millisecond();
+ let nanos = ts.get_nano_of_millisecond();
+
+ if let Some(b) = builder
+ .as_any_mut()
+ .downcast_mut::<TimestampSecondBuilder>()
+ {
+ // Convert milliseconds to seconds
+ b.append_value(millis / MILLIS_PER_SECOND);
+ return Ok(());
+ }
+ if let Some(b) = builder
+ .as_any_mut()
+ .downcast_mut::<TimestampMillisecondBuilder>()
+ {
+ b.append_value(millis);
+ return Ok(());
+ }
+ if let Some(b) = builder
+ .as_any_mut()
+ .downcast_mut::<TimestampMicrosecondBuilder>()
+ {
+ // Convert milliseconds + nanos to microseconds
Review Comment:
nit: can we extract convert to `microseconds`, convert to `nanoseconds` as
common method so that timstampntz, timestampltz can share it?
##########
crates/fluss/src/row/datum.rs:
##########
@@ -246,19 +285,122 @@ impl TryFrom<&Datum<'_>> for i8 {
}
}
+impl TryFrom<&Datum<'_>> for Decimal {
+ type Error = ();
+
+ #[inline]
+ fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
+ match from {
+ Datum::Decimal(d) => Ok(d.clone()),
+ _ => Err(()),
+ }
+ }
+}
+
+impl TryFrom<&Datum<'_>> for Date {
+ type Error = ();
+
+ #[inline]
+ fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
+ match from {
+ Datum::Date(d) => Ok(*d),
+ _ => Err(()),
+ }
+ }
+}
+
+impl TryFrom<&Datum<'_>> for Time {
+ type Error = ();
+
+ #[inline]
+ fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
+ match from {
+ Datum::Time(t) => Ok(*t),
+ _ => Err(()),
+ }
+ }
+}
+
+impl TryFrom<&Datum<'_>> for TimestampNtz {
+ type Error = ();
+
+ #[inline]
+ fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
+ match from {
+ Datum::TimestampNtz(ts) => Ok(*ts),
+ _ => Err(()),
+ }
+ }
+}
+
+impl TryFrom<&Datum<'_>> for TimestampLtz {
+ type Error = ();
+
+ #[inline]
+ fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
+ match from {
+ Datum::TimestampLtz(ts) => Ok(*ts),
+ _ => Err(()),
+ }
+ }
+}
Review Comment:
Considering `Date` is just wraping a int, I think copy is fine for clean
code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]