Copilot commented on code in PR #196:
URL: https://github.com/apache/fluss-rust/pull/196#discussion_r2714999454


##########
crates/fluss/src/row/datum.rs:
##########
@@ -298,16 +445,186 @@ impl Datum<'_> {
             Datum::Float64(v) => append_value_to_arrow!(Float64Builder, 
v.into_inner()),
             Datum::String(v) => append_value_to_arrow!(StringBuilder, 
v.as_ref()),
             Datum::Blob(v) => append_value_to_arrow!(BinaryBuilder, 
v.as_ref()),
-            Datum::Decimal(_)
-            | Datum::Date(_)
-            | Datum::Time(_)
-            | Datum::TimestampNtz(_)
-            | Datum::TimestampLtz(_) => {
+            Datum::Decimal(decimal) => {
+                // Extract target precision and scale from Arrow schema
+                let (target_precision, target_scale) = match data_type {
+                    arrow_schema::DataType::Decimal128(p, s) => (*p as u32, *s 
as u32),
+                    _ => {
+                        return Err(RowConvertError {
+                            message: format!(
+                                "Expected Decimal128 Arrow type, got: {:?}",
+                                data_type
+                            ),
+                        });
+                    }
+                };
+
+                if let Some(b) = 
builder.as_any_mut().downcast_mut::<Decimal128Builder>() {
+                    use bigdecimal::RoundingMode;
+
+                    // Rescale the decimal to match Arrow's target scale
+                    let bd = decimal.to_big_decimal();
+                    let rescaled = bd.with_scale_round(target_scale as i64, 
RoundingMode::HalfUp);
+                    let (unscaled, _) = rescaled.as_bigint_and_exponent();
+
+                    // Validate precision
+                    let actual_precision = 
Decimal::compute_precision(&unscaled);
+                    if actual_precision > target_precision as usize {
+                        return Err(RowConvertError {
+                            message: format!(
+                                "Decimal precision overflow: value has {} 
digits but Arrow expects {} (value: {})",
+                                actual_precision, target_precision, rescaled
+                            ),
+                        });
+                    }
+
+                    // Convert to i128 for Arrow
+                    let i128_val: i128 = match unscaled.try_into() {
+                        Ok(v) => v,
+                        Err(_) => {
+                            return Err(RowConvertError {
+                                message: format!("Decimal value exceeds i128 
range: {}", rescaled),
+                            });
+                        }
+                    };
+
+                    b.append_value(i128_val);
+                    return Ok(());
+                }
+
                 return Err(RowConvertError {
-                    message: format!(
-                        "Type {:?} is not yet supported for Arrow conversion",
-                        std::mem::discriminant(self)
-                    ),
+                    message: "Builder type mismatch for 
Decimal128".to_string(),
+                });
+            }
+            Datum::Date(date) => {
+                append_value_to_arrow!(Date32Builder, date.get_inner());
+            }
+            Datum::Time(time) => {
+                // Time is stored as milliseconds since midnight in Fluss
+                // Convert to Arrow's time unit based on schema
+                let millis = time.get_inner();
+
+                match data_type {
+                    
arrow_schema::DataType::Time32(arrow_schema::TimeUnit::Second) => {
+                        if let Some(b) = 
builder.as_any_mut().downcast_mut::<Int32Builder>() {
+                            b.append_value(millis / MILLIS_PER_SECOND as i32);
+                            return Ok(());
+                        }
+                    }
+                    
arrow_schema::DataType::Time32(arrow_schema::TimeUnit::Millisecond) => {
+                        if let Some(b) = 
builder.as_any_mut().downcast_mut::<Int32Builder>() {
+                            b.append_value(millis);
+                            return Ok(());
+                        }
+                    }
+                    
arrow_schema::DataType::Time64(arrow_schema::TimeUnit::Microsecond) => {
+                        if let Some(b) = 
builder.as_any_mut().downcast_mut::<Int64Builder>() {
+                            b.append_value(millis as i64 * MICROS_PER_MILLI);
+                            return Ok(());
+                        }
+                    }
+                    
arrow_schema::DataType::Time64(arrow_schema::TimeUnit::Nanosecond) => {
+                        if let Some(b) = 
builder.as_any_mut().downcast_mut::<Int64Builder>() {
+                            b.append_value(millis as i64 * NANOS_PER_MILLI);
+                            return Ok(());
+                        }
+                    }
+                    _ => {
+                        return Err(RowConvertError {
+                            message: format!(
+                                "Expected Time32/Time64 Arrow type, got: {:?}",
+                                data_type
+                            ),
+                        });
+                    }
+                }
+
+                return Err(RowConvertError {
+                    message: "Builder type mismatch for Time".to_string(),
+                });
+            }
+            Datum::TimestampNtz(ts) => {
+                let millis = ts.get_millisecond();
+                let nanos = ts.get_nano_of_millisecond();
+
+                if let Some(b) = builder
+                    .as_any_mut()
+                    .downcast_mut::<TimestampSecondBuilder>()
+                {
+                    // Convert milliseconds to seconds
+                    b.append_value(millis / MILLIS_PER_SECOND);
+                    return Ok(());
+                }
+                if let Some(b) = builder
+                    .as_any_mut()
+                    .downcast_mut::<TimestampMillisecondBuilder>()
+                {
+                    b.append_value(millis);
+                    return Ok(());
+                }
+                if let Some(b) = builder
+                    .as_any_mut()
+                    .downcast_mut::<TimestampMicrosecondBuilder>()
+                {
+                    // Convert milliseconds + nanos to microseconds
+                    let micros = millis * MICROS_PER_MILLI + (nanos as i64) / 
MICROS_PER_MILLI;
+                    b.append_value(micros);

Review Comment:
   Timestamp unit conversion multiplies `millis` by 
`MICROS_PER_MILLI`/`NANOS_PER_MILLI` without overflow checks. For sufficiently 
large (or corrupted) timestamps this can overflow `i64` (panic in debug, wrap 
in release) and corrupt the serialized value. Use `checked_mul`/`checked_add` 
and return `RowConvertError` on overflow.



##########
crates/fluss/src/row/datum.rs:
##########
@@ -298,16 +445,186 @@ impl Datum<'_> {
             Datum::Float64(v) => append_value_to_arrow!(Float64Builder, 
v.into_inner()),
             Datum::String(v) => append_value_to_arrow!(StringBuilder, 
v.as_ref()),
             Datum::Blob(v) => append_value_to_arrow!(BinaryBuilder, 
v.as_ref()),
-            Datum::Decimal(_)
-            | Datum::Date(_)
-            | Datum::Time(_)
-            | Datum::TimestampNtz(_)
-            | Datum::TimestampLtz(_) => {
+            Datum::Decimal(decimal) => {
+                // Extract target precision and scale from Arrow schema
+                let (target_precision, target_scale) = match data_type {
+                    arrow_schema::DataType::Decimal128(p, s) => (*p as u32, *s 
as u32),
+                    _ => {

Review Comment:
   `DataType::Decimal128` stores `scale` as an `i8` (and Arrow allows negative 
scales). Casting `*s` to `u32` will turn negative scales into a huge positive 
number, causing incorrect rescaling and likely overflow/allocations. Keep 
`target_scale` as an `i8`/`i64` (no unsigned cast) and pass it directly to 
`with_scale_round`; also consider validating that the scale is in the range 
supported by your `Decimal` representation.



##########
crates/fluss/src/row/datum.rs:
##########
@@ -298,16 +445,186 @@ impl Datum<'_> {
             Datum::Float64(v) => append_value_to_arrow!(Float64Builder, 
v.into_inner()),
             Datum::String(v) => append_value_to_arrow!(StringBuilder, 
v.as_ref()),
             Datum::Blob(v) => append_value_to_arrow!(BinaryBuilder, 
v.as_ref()),
-            Datum::Decimal(_)
-            | Datum::Date(_)
-            | Datum::Time(_)
-            | Datum::TimestampNtz(_)
-            | Datum::TimestampLtz(_) => {
+            Datum::Decimal(decimal) => {
+                // Extract target precision and scale from Arrow schema
+                let (target_precision, target_scale) = match data_type {
+                    arrow_schema::DataType::Decimal128(p, s) => (*p as u32, *s 
as u32),
+                    _ => {
+                        return Err(RowConvertError {
+                            message: format!(
+                                "Expected Decimal128 Arrow type, got: {:?}",
+                                data_type
+                            ),
+                        });
+                    }
+                };
+
+                if let Some(b) = 
builder.as_any_mut().downcast_mut::<Decimal128Builder>() {
+                    use bigdecimal::RoundingMode;
+
+                    // Rescale the decimal to match Arrow's target scale
+                    let bd = decimal.to_big_decimal();
+                    let rescaled = bd.with_scale_round(target_scale as i64, 
RoundingMode::HalfUp);
+                    let (unscaled, _) = rescaled.as_bigint_and_exponent();
+
+                    // Validate precision
+                    let actual_precision = 
Decimal::compute_precision(&unscaled);
+                    if actual_precision > target_precision as usize {
+                        return Err(RowConvertError {
+                            message: format!(
+                                "Decimal precision overflow: value has {} 
digits but Arrow expects {} (value: {})",
+                                actual_precision, target_precision, rescaled
+                            ),
+                        });
+                    }
+
+                    // Convert to i128 for Arrow
+                    let i128_val: i128 = match unscaled.try_into() {
+                        Ok(v) => v,
+                        Err(_) => {
+                            return Err(RowConvertError {
+                                message: format!("Decimal value exceeds i128 
range: {}", rescaled),
+                            });
+                        }
+                    };
+
+                    b.append_value(i128_val);
+                    return Ok(());
+                }
+
                 return Err(RowConvertError {
-                    message: format!(
-                        "Type {:?} is not yet supported for Arrow conversion",
-                        std::mem::discriminant(self)
-                    ),
+                    message: "Builder type mismatch for 
Decimal128".to_string(),
+                });
+            }
+            Datum::Date(date) => {
+                append_value_to_arrow!(Date32Builder, date.get_inner());
+            }
+            Datum::Time(time) => {
+                // Time is stored as milliseconds since midnight in Fluss
+                // Convert to Arrow's time unit based on schema
+                let millis = time.get_inner();
+
+                match data_type {
+                    
arrow_schema::DataType::Time32(arrow_schema::TimeUnit::Second) => {
+                        if let Some(b) = 
builder.as_any_mut().downcast_mut::<Int32Builder>() {
+                            b.append_value(millis / MILLIS_PER_SECOND as i32);
+                            return Ok(());
+                        }
+                    }
+                    
arrow_schema::DataType::Time32(arrow_schema::TimeUnit::Millisecond) => {
+                        if let Some(b) = 
builder.as_any_mut().downcast_mut::<Int32Builder>() {
+                            b.append_value(millis);
+                            return Ok(());
+                        }
+                    }
+                    
arrow_schema::DataType::Time64(arrow_schema::TimeUnit::Microsecond) => {
+                        if let Some(b) = 
builder.as_any_mut().downcast_mut::<Int64Builder>() {
+                            b.append_value(millis as i64 * MICROS_PER_MILLI);
+                            return Ok(());
+                        }
+                    }
+                    
arrow_schema::DataType::Time64(arrow_schema::TimeUnit::Nanosecond) => {
+                        if let Some(b) = 
builder.as_any_mut().downcast_mut::<Int64Builder>() {
+                            b.append_value(millis as i64 * NANOS_PER_MILLI);
+                            return Ok(());
+                        }
+                    }
+                    _ => {
+                        return Err(RowConvertError {
+                            message: format!(
+                                "Expected Time32/Time64 Arrow type, got: {:?}",
+                                data_type
+                            ),
+                        });
+                    }
+                }
+
+                return Err(RowConvertError {
+                    message: "Builder type mismatch for Time".to_string(),
+                });
+            }
+            Datum::TimestampNtz(ts) => {
+                let millis = ts.get_millisecond();
+                let nanos = ts.get_nano_of_millisecond();
+
+                if let Some(b) = builder
+                    .as_any_mut()
+                    .downcast_mut::<TimestampSecondBuilder>()
+                {
+                    // Convert milliseconds to seconds
+                    b.append_value(millis / MILLIS_PER_SECOND);
+                    return Ok(());
+                }
+                if let Some(b) = builder
+                    .as_any_mut()
+                    .downcast_mut::<TimestampMillisecondBuilder>()
+                {
+                    b.append_value(millis);
+                    return Ok(());
+                }
+                if let Some(b) = builder
+                    .as_any_mut()
+                    .downcast_mut::<TimestampMicrosecondBuilder>()
+                {
+                    // Convert milliseconds + nanos to microseconds
+                    let micros = millis * MICROS_PER_MILLI + (nanos as i64) / 
MICROS_PER_MILLI;
+                    b.append_value(micros);
+                    return Ok(());
+                }
+                if let Some(b) = builder
+                    .as_any_mut()
+                    .downcast_mut::<TimestampNanosecondBuilder>()
+                {
+                    // Convert milliseconds + nanos to nanoseconds
+                    let nanos_total = millis * NANOS_PER_MILLI + nanos as i64;
+                    b.append_value(nanos_total);
+                    return Ok(());
+                }
+
+                return Err(RowConvertError {
+                    message: "Builder type mismatch for 
TimestampNtz".to_string(),
+                });
+            }
+            Datum::TimestampLtz(ts) => {
+                let millis = ts.get_epoch_millisecond();
+                let nanos = ts.get_nano_of_millisecond();
+
+                if let Some(b) = builder
+                    .as_any_mut()
+                    .downcast_mut::<TimestampSecondBuilder>()
+                {
+                    // Convert milliseconds to seconds
+                    b.append_value(millis / MILLIS_PER_SECOND);
+                    return Ok(());
+                }
+                if let Some(b) = builder
+                    .as_any_mut()
+                    .downcast_mut::<TimestampMillisecondBuilder>()
+                {
+                    b.append_value(millis);
+                    return Ok(());
+                }
+                if let Some(b) = builder
+                    .as_any_mut()
+                    .downcast_mut::<TimestampMicrosecondBuilder>()
+                {
+                    // Convert milliseconds + nanos to microseconds
+                    let micros = millis * MICROS_PER_MILLI + (nanos as i64) / 
MICROS_PER_MILLI;
+                    b.append_value(micros);

Review Comment:
   Timestamp unit conversion multiplies `millis` by 
`MICROS_PER_MILLI`/`NANOS_PER_MILLI` without overflow checks. For sufficiently 
large (or corrupted) timestamps this can overflow `i64` (panic in debug, wrap 
in release) and corrupt the serialized value. Use `checked_mul`/`checked_add` 
and return `RowConvertError` on overflow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to