leekeiabstraction commented on code in PR #196:
URL: https://github.com/apache/fluss-rust/pull/196#discussion_r2716411604


##########
crates/fluss/src/row/datum.rs:
##########
@@ -246,19 +285,122 @@ impl TryFrom<&Datum<'_>> for i8 {
     }
 }
 
+impl TryFrom<&Datum<'_>> for Decimal {
+    type Error = ();
+
+    #[inline]
+    fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
+        match from {
+            Datum::Decimal(d) => Ok(d.clone()),
+            _ => Err(()),
+        }
+    }
+}
+
+impl TryFrom<&Datum<'_>> for Date {
+    type Error = ();
+
+    #[inline]
+    fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
+        match from {
+            Datum::Date(d) => Ok(*d),
+            _ => Err(()),
+        }
+    }
+}
+
+impl TryFrom<&Datum<'_>> for Time {
+    type Error = ();
+
+    #[inline]
+    fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
+        match from {
+            Datum::Time(t) => Ok(*t),
+            _ => Err(()),
+        }
+    }
+}
+
+impl TryFrom<&Datum<'_>> for TimestampNtz {
+    type Error = ();
+
+    #[inline]
+    fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
+        match from {
+            Datum::TimestampNtz(ts) => Ok(*ts),
+            _ => Err(()),
+        }
+    }
+}
+
+impl TryFrom<&Datum<'_>> for TimestampLtz {
+    type Error = ();
+
+    #[inline]
+    fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
+        match from {
+            Datum::TimestampLtz(ts) => Ok(*ts),
+            _ => Err(()),
+        }
+    }
+}

Review Comment:
   Should we prefer non-copy variant like that for `&'b str`?
   
   ```
   impl <'b, 'a: 'b> TryFrom<&'b Datum<'a>> for &'b Date {
       type Error = ();
   
       #[inline]
       fn try_from(from: &'b Datum<'a>) -> std::result::Result<Self, 
Self::Error> {
           match from {
               Datum::Date(d) => Ok(d),
               _ => Err(()),
           }
       }
   }
   ```



##########
crates/fluss/src/record/arrow.rs:
##########
@@ -172,61 +175,129 @@ pub struct RowAppendRecordBatchBuilder {
 }
 
 impl RowAppendRecordBatchBuilder {
-    pub fn new(row_type: &RowType) -> Self {
-        let schema_ref = to_arrow_schema(row_type);
-        let builders = Mutex::new(
-            schema_ref
-                .fields()
-                .iter()
-                .map(|field| Self::create_builder(field.data_type()))
-                .collect(),
-        );
-        Self {
+    pub fn new(row_type: &RowType) -> Result<Self> {
+        let schema_ref = to_arrow_schema(row_type)?;
+        let builders: Result<Vec<_>> = schema_ref
+            .fields()
+            .iter()
+            .map(|field| Self::create_builder(field.data_type()))
+            .collect();
+        Ok(Self {
             table_schema: schema_ref.clone(),
-            arrow_column_builders: builders,
+            arrow_column_builders: Mutex::new(builders?),

Review Comment:
   Can you clarify why Mutex is necessary?



##########
crates/fluss/src/record/arrow.rs:
##########
@@ -1249,4 +1388,201 @@ mod tests {
         }
         out
     }
+
+    #[test]
+    fn test_temporal_and_decimal_builder_validation() {
+        // Test valid builder creation
+        let builder =
+            
RowAppendRecordBatchBuilder::create_builder(&ArrowDataType::Decimal128(10, 
2)).unwrap();
+        assert!(
+            builder
+                .as_any()
+                .downcast_ref::<Decimal128Builder>()
+                .is_some()

Review Comment:
   Should we assert precision and scale as well?



##########
crates/fluss/src/record/arrow.rs:
##########
@@ -1249,4 +1388,201 @@ mod tests {
         }
         out
     }
+
+    #[test]
+    fn test_temporal_and_decimal_builder_validation() {
+        // Test valid builder creation
+        let builder =
+            
RowAppendRecordBatchBuilder::create_builder(&ArrowDataType::Decimal128(10, 
2)).unwrap();
+        assert!(
+            builder
+                .as_any()
+                .downcast_ref::<Decimal128Builder>()
+                .is_some()
+        );
+
+        // Test error case: invalid precision/scale
+        let result =
+            
RowAppendRecordBatchBuilder::create_builder(&ArrowDataType::Decimal128(100, 
50));
+        assert!(result.is_err());
+    }
+
+    #[test]
+    fn test_decimal_rescaling_and_validation() -> Result<()> {
+        use crate::row::{Datum, Decimal, GenericRow};
+        use arrow::array::Decimal128Array;
+        use bigdecimal::BigDecimal;
+        use std::str::FromStr;
+
+        // Test 1: Rescaling from scale 3 to scale 2
+        let row_type = RowType::new(vec![DataField::new(
+            "amount".to_string(),
+            DataTypes::decimal(10, 2),
+            None,
+        )]);
+        let mut builder = RowAppendRecordBatchBuilder::new(&row_type)?;
+        let decimal = 
Decimal::from_big_decimal(BigDecimal::from_str("123.456").unwrap(), 10, 3)?;
+        builder.append(&GenericRow {
+            values: vec![Datum::Decimal(decimal)],
+        })?;
+        let batch = builder.build_arrow_record_batch()?;
+        let array = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<Decimal128Array>()
+            .unwrap();
+        assert_eq!(array.value(0), 12346); // 123.46 rounded
+        assert_eq!(array.scale(), 2);
+
+        // Test 2: Precision overflow (should error)
+        let row_type = RowType::new(vec![DataField::new(
+            "amount".to_string(),
+            DataTypes::decimal(5, 2),
+            None,
+        )]);
+        let mut builder = RowAppendRecordBatchBuilder::new(&row_type)?;
+        let decimal = 
Decimal::from_big_decimal(BigDecimal::from_str("123456.78").unwrap(), 10, 2)?;
+        let result = builder.append(&GenericRow {
+            values: vec![Datum::Decimal(decimal)],
+        });
+        assert!(result.is_err());
+        assert!(
+            result
+                .unwrap_err()
+                .to_string()
+                .contains("precision overflow")
+        );
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_temporal_types_end_to_end() -> Result<()> {
+        use crate::row::{Date, Datum, Decimal, GenericRow, Time, TimestampLtz, 
TimestampNtz};
+        use arrow::array::{
+            Date32Array, Decimal128Array, Int32Array, Time32MillisecondArray,
+            Time64NanosecondArray, TimestampMicrosecondArray, 
TimestampNanosecondArray,
+        };
+        use bigdecimal::BigDecimal;
+        use std::str::FromStr;
+
+        // Schema with decimal, date, time (ms + ns), timestamps (μs + ns)
+        let row_type = RowType::new(vec![
+            DataField::new("id".to_string(), DataTypes::int(), None),
+            DataField::new("amount".to_string(), DataTypes::decimal(10, 2), 
None),
+            DataField::new("date".to_string(), DataTypes::date(), None),
+            DataField::new(
+                "time_ms".to_string(),
+                DataTypes::time_with_precision(3),
+                None,
+            ),
+            DataField::new(
+                "time_ns".to_string(),
+                DataTypes::time_with_precision(9),
+                None,
+            ),
+            DataField::new(
+                "ts_us".to_string(),
+                DataTypes::timestamp_with_precision(6),
+                None,
+            ),
+            DataField::new(
+                "ts_ltz_ns".to_string(),
+                DataTypes::timestamp_ltz_with_precision(9),
+                None,
+            ),
+        ]);
+
+        let mut builder = RowAppendRecordBatchBuilder::new(&row_type)?;
+
+        // Append rows with temporal values
+        builder.append(&GenericRow {
+            values: vec![
+                Datum::Int32(1),
+                Datum::Decimal(Decimal::from_big_decimal(
+                    BigDecimal::from_str("123.456").unwrap(),
+                    10,
+                    3,
+                )?),
+                Datum::Date(Date::new(18000)),
+                Datum::Time(Time::new(43200000)),
+                Datum::Time(Time::new(12345)),

Review Comment:
   What do these value mean in human readable date/time?



##########
crates/fluss/src/record/arrow.rs:
##########
@@ -1249,4 +1388,201 @@ mod tests {
         }
         out
     }
+
+    #[test]
+    fn test_temporal_and_decimal_builder_validation() {
+        // Test valid builder creation
+        let builder =
+            
RowAppendRecordBatchBuilder::create_builder(&ArrowDataType::Decimal128(10, 
2)).unwrap();
+        assert!(
+            builder
+                .as_any()
+                .downcast_ref::<Decimal128Builder>()
+                .is_some()
+        );
+
+        // Test error case: invalid precision/scale
+        let result =
+            
RowAppendRecordBatchBuilder::create_builder(&ArrowDataType::Decimal128(100, 
50));
+        assert!(result.is_err());
+    }
+
+    #[test]
+    fn test_decimal_rescaling_and_validation() -> Result<()> {
+        use crate::row::{Datum, Decimal, GenericRow};
+        use arrow::array::Decimal128Array;
+        use bigdecimal::BigDecimal;
+        use std::str::FromStr;
+
+        // Test 1: Rescaling from scale 3 to scale 2
+        let row_type = RowType::new(vec![DataField::new(
+            "amount".to_string(),
+            DataTypes::decimal(10, 2),
+            None,
+        )]);
+        let mut builder = RowAppendRecordBatchBuilder::new(&row_type)?;
+        let decimal = 
Decimal::from_big_decimal(BigDecimal::from_str("123.456").unwrap(), 10, 3)?;
+        builder.append(&GenericRow {
+            values: vec![Datum::Decimal(decimal)],
+        })?;
+        let batch = builder.build_arrow_record_batch()?;
+        let array = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<Decimal128Array>()
+            .unwrap();
+        assert_eq!(array.value(0), 12346); // 123.46 rounded

Review Comment:
   This should say 123.456 rounded to 2 decimal points instead?



##########
crates/fluss/src/record/arrow.rs:
##########
@@ -1249,4 +1388,201 @@ mod tests {
         }
         out
     }
+
+    #[test]
+    fn test_temporal_and_decimal_builder_validation() {
+        // Test valid builder creation
+        let builder =
+            
RowAppendRecordBatchBuilder::create_builder(&ArrowDataType::Decimal128(10, 
2)).unwrap();
+        assert!(
+            builder
+                .as_any()
+                .downcast_ref::<Decimal128Builder>()
+                .is_some()
+        );
+
+        // Test error case: invalid precision/scale
+        let result =
+            
RowAppendRecordBatchBuilder::create_builder(&ArrowDataType::Decimal128(100, 
50));
+        assert!(result.is_err());
+    }
+
+    #[test]
+    fn test_decimal_rescaling_and_validation() -> Result<()> {
+        use crate::row::{Datum, Decimal, GenericRow};
+        use arrow::array::Decimal128Array;
+        use bigdecimal::BigDecimal;
+        use std::str::FromStr;
+
+        // Test 1: Rescaling from scale 3 to scale 2
+        let row_type = RowType::new(vec![DataField::new(
+            "amount".to_string(),
+            DataTypes::decimal(10, 2),
+            None,
+        )]);
+        let mut builder = RowAppendRecordBatchBuilder::new(&row_type)?;
+        let decimal = 
Decimal::from_big_decimal(BigDecimal::from_str("123.456").unwrap(), 10, 3)?;
+        builder.append(&GenericRow {
+            values: vec![Datum::Decimal(decimal)],
+        })?;
+        let batch = builder.build_arrow_record_batch()?;
+        let array = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<Decimal128Array>()
+            .unwrap();
+        assert_eq!(array.value(0), 12346); // 123.46 rounded
+        assert_eq!(array.scale(), 2);
+
+        // Test 2: Precision overflow (should error)
+        let row_type = RowType::new(vec![DataField::new(
+            "amount".to_string(),
+            DataTypes::decimal(5, 2),
+            None,
+        )]);
+        let mut builder = RowAppendRecordBatchBuilder::new(&row_type)?;
+        let decimal = 
Decimal::from_big_decimal(BigDecimal::from_str("123456.78").unwrap(), 10, 2)?;
+        let result = builder.append(&GenericRow {
+            values: vec![Datum::Decimal(decimal)],
+        });
+        assert!(result.is_err());
+        assert!(
+            result
+                .unwrap_err()
+                .to_string()
+                .contains("precision overflow")
+        );
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_temporal_types_end_to_end() -> Result<()> {

Review Comment:
   We seem to be testing int and decimal in this test case as well despite name 
indicating temporal only.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to