leekeiabstraction commented on code in PR #175:
URL: https://github.com/apache/fluss-rust/pull/175#discussion_r2704338797


##########
crates/fluss/src/row/encode/compacted_key_encoder.rs:
##########
@@ -238,86 +238,118 @@ mod tests {
     }
 
     #[test]
-    fn test_all_data_types() {
+    fn test_all_data_types_java_compatible() {
+        // Test encoding compatibility with Java using reference from:
+        // 
https://github.com/apache/fluss/blob/main/fluss-common/src/test/resources/encoding/encoded_key.hex
+        use crate::metadata::{DataType, TimestampLTzType, TimestampType};
+
         let row_type = RowType::with_data_types(vec![
-            DataTypes::boolean(),
-            DataTypes::tinyint(),
-            DataTypes::smallint(),
-            DataTypes::int(),
-            DataTypes::bigint(),
-            DataTypes::float(),
-            DataTypes::double(),
-            // TODO Date
-            // TODO Time
-            DataTypes::binary(20),
-            DataTypes::bytes(),
-            DataTypes::char(2),
-            DataTypes::string(),
-            // TODO Decimal
-            // TODO Timestamp
-            // TODO Timestamp LTZ
-            // TODO Array of Int
-            // TODO Array of Float
-            // TODO Array of String
-            // TODO: Add Map and Row fields in Issue #1973
+            DataTypes::boolean(),                                              
// BOOLEAN
+            DataTypes::tinyint(),                                              
// TINYINT
+            DataTypes::smallint(),                                             
// SMALLINT
+            DataTypes::int(),                                                  
// INT
+            DataTypes::bigint(),                                               
// BIGINT
+            DataTypes::float(),                                                
// FLOAT
+            DataTypes::double(),                                               
// DOUBLE
+            DataTypes::date(),                                                 
// DATE
+            DataTypes::time(),                                                 
// TIME
+            DataTypes::binary(20),                                             
// BINARY(20)
+            DataTypes::bytes(),                                                
// BYTES
+            DataTypes::char(2),                                                
// CHAR(2)
+            DataTypes::string(),                                               
// STRING
+            DataTypes::decimal(5, 2),                                          
// DECIMAL(5,2)
+            DataTypes::decimal(20, 0),                                         
// DECIMAL(20,0)
+            DataType::Timestamp(TimestampType::with_nullable(false, 1)),       
// TIMESTAMP(1)
+            DataType::Timestamp(TimestampType::with_nullable(false, 5)),       
// TIMESTAMP(5)
+            DataType::TimestampLTz(TimestampLTzType::with_nullable(false, 1)), 
// TIMESTAMP_LTZ(1)
+            DataType::TimestampLTz(TimestampLTzType::with_nullable(false, 5)), 
// TIMESTAMP_LTZ(5)

Review Comment:
   nit, leave the TODOs here for ARRAY / MAP etc.



##########
crates/fluss/src/row/compacted/compacted_row.rs:
##########
@@ -187,70 +207,92 @@ mod tests {
         assert_eq!(row.get_string(7), "Hello World");
         assert_eq!(row.get_bytes(8), &[1, 2, 3, 4, 5]);
 
-        // Test with nulls
-        let row_type = RowType::with_data_types(
-            [
-                DataType::Int(IntType::new()),
-                DataType::String(StringType::new()),
-                DataType::Double(DoubleType::new()),
-            ]
-            .to_vec(),
-        );
+        // Test with nulls and negative values
+        let row_type = RowType::with_data_types(vec![
+            DataType::Int(IntType::new()),
+            DataType::String(StringType::new()),
+            DataType::Double(DoubleType::new()),
+        ]);
 
         let mut writer = CompactedRowWriter::new(row_type.fields().len());
-
-        writer.write_int(100);
+        writer.write_int(-42);
         writer.set_null_at(1);
         writer.write_double(2.71);
 
         let bytes = writer.to_bytes();
-        row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
+        let row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
 
         assert!(!row.is_null_at(0));
         assert!(row.is_null_at(1));
         assert!(!row.is_null_at(2));
-        assert_eq!(row.get_int(0), 100);
+        assert_eq!(row.get_int(0), -42);
         assert_eq!(row.get_double(2), 2.71);
+        // Verify caching works on repeated reads
+        assert_eq!(row.get_int(0), -42);
+    }
 
-        // Test multiple reads (caching)
-        assert_eq!(row.get_int(0), 100);
-        assert_eq!(row.get_int(0), 100);
+    #[test]
+    fn test_compacted_row_temporal_and_decimal_types() {
+        // Comprehensive test covering DATE, TIME, TIMESTAMP 
(compact/non-compact), and DECIMAL (compact/non-compact)
+        use crate::metadata::{DataTypes, DecimalType, TimestampLTzType, 
TimestampType};
+        use crate::row::Decimal;
+        use crate::row::datum::{TimestampLtz, TimestampNtz};
+        use bigdecimal::{BigDecimal, num_bigint::BigInt};
 
-        // Test from_bytes
         let row_type = RowType::with_data_types(vec![
-            DataType::Int(IntType::new()),
-            DataType::String(StringType::new()),
+            DataTypes::date(),
+            DataTypes::time(),
+            DataType::Timestamp(TimestampType::with_nullable(true, 3)), // 
Compact (precision <= 3)
+            DataType::TimestampLTz(TimestampLTzType::with_nullable(true, 3)), 
// Compact
+            DataType::Timestamp(TimestampType::with_nullable(true, 6)), // 
Non-compact (precision > 3)
+            DataType::TimestampLTz(TimestampLTzType::with_nullable(true, 9)), 
// Non-compact
+            DataType::Decimal(DecimalType::new(10, 2)),                 // 
Compact (precision <= 18)
+            DataType::Decimal(DecimalType::new(28, 10)), // Non-compact 
(precision > 18)
         ]);
 
         let mut writer = CompactedRowWriter::new(row_type.fields().len());
-        writer.write_int(-1);
-        writer.write_string("test");
-
-        let bytes = writer.to_bytes();
-        let mut row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
-
-        assert_eq!(row.get_int(0), -1);
-        assert_eq!(row.get_string(1), "test");
 
-        // Test large row
-        let num_fields = 100;
-        let row_type = RowType::with_data_types(
-            (0..num_fields)
-                .map(|_| DataType::Int(IntType::new()))
-                .collect(),
-        );
-
-        let mut writer = CompactedRowWriter::new(num_fields);
+        // Write values
+        writer.write_int(19651); // Date: 2023-10-25
+        writer.write_time(34200000, 0); // Time: 09:30:00.0
+        writer.write_timestamp_ntz(&TimestampNtz::new(1698235273182), 3); // 
Compact timestamp
+        writer.write_timestamp_ltz(&TimestampLtz::new(1698235273182), 3); // 
Compact timestamp ltz
+        let ts_ntz_high = TimestampNtz::from_millis_nanos(1698235273182, 
123456).unwrap();
+        let ts_ltz_high = TimestampLtz::from_millis_nanos(1698235273182, 
987654).unwrap();
+        writer.write_timestamp_ntz(&ts_ntz_high, 6); // Non-compact timestamp 
with nanos
+        writer.write_timestamp_ltz(&ts_ltz_high, 9); // Non-compact timestamp 
ltz with nanos
+
+        // Create Decimal values for testing
+        let small_decimal =
+            Decimal::from_big_decimal(BigDecimal::new(BigInt::from(12345), 2), 
10, 2).unwrap(); // Compact decimal: 123.45
+        let large_decimal = Decimal::from_big_decimal(
+            BigDecimal::new(BigInt::from(999999999999999999i128), 10),
+            28,
+            10,
+        )
+        .unwrap(); // Non-compact decimal
 
-        for i in 0..num_fields {
-            writer.write_int((i * 10) as i32);
-        }
+        writer.write_decimal(&small_decimal, 10);
+        writer.write_decimal(&large_decimal, 28);
 
         let bytes = writer.to_bytes();
-        row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
-
-        for i in 0..num_fields {
-            assert_eq!(row.get_int(i), (i * 10) as i32);
-        }
+        let row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
+
+        // Verify all values
+        assert_eq!(row.get_date(0).get_inner(), 19651);
+        assert_eq!(row.get_time(1).get_inner(), 34200000);
+        assert_eq!(row.get_timestamp_ntz(2, 3).get_millisecond(), 
1698235273182);
+        assert_eq!(
+            row.get_timestamp_ltz(3, 3).get_epoch_millisecond(),
+            1698235273182
+        );
+        let read_ts_ntz = row.get_timestamp_ntz(4, 6);
+        assert_eq!(read_ts_ntz.get_millisecond(), 1698235273182);
+        assert_eq!(read_ts_ntz.get_nano_of_millisecond(), 123456);
+        let read_ts_ltz = row.get_timestamp_ltz(5, 9);
+        assert_eq!(read_ts_ltz.get_epoch_millisecond(), 1698235273182);
+        assert_eq!(read_ts_ltz.get_nano_of_millisecond(), 987654);
+        assert_eq!(row.get_decimal(6, 10, 2), small_decimal);
+        assert_eq!(row.get_decimal(7, 28, 10), large_decimal);
     }

Review Comment:
   Should we assert on absolute value of unscaled long, precision and scale? 
This way any regression within Decimal can be caught



##########
crates/fluss/src/metadata/json_serde.rs:
##########
@@ -202,29 +202,51 @@ impl JsonSerde for DataType {
                     .get(Self::FIELD_NAME_SCALE)
                     .and_then(|v| v.as_u64())
                     .unwrap_or(0) as u32;
-                DataTypes::decimal(precision, scale)
+                DataType::Decimal(
+                    crate::metadata::datatype::DecimalType::try_with_nullable(
+                        true, precision, scale,
+                    )
+                    .map_err(|e| Error::JsonSerdeError {
+                        message: format!("Invalid DECIMAL parameters: {}", e),
+                    })?,
+                )
             }
             "DATE" => DataTypes::date(),
             "TIME_WITHOUT_TIME_ZONE" => {
                 let precision = node
                     .get(Self::FIELD_NAME_PRECISION)
                     .and_then(|v| v.as_u64())
                     .unwrap_or(0) as u32;
-                DataTypes::time_with_precision(precision)
+                DataType::Time(
+                    
crate::metadata::datatype::TimeType::try_with_nullable(true, precision)
+                        .map_err(|e| Error::JsonSerdeError {
+                            message: format!("Invalid TIME precision: {}", e),
+                        })?,
+                )
             }
             "TIMESTAMP_WITHOUT_TIME_ZONE" => {
                 let precision = node
                     .get(Self::FIELD_NAME_PRECISION)
                     .and_then(|v| v.as_u64())
                     .unwrap_or(6) as u32;
-                DataTypes::timestamp_with_precision(precision)
+                DataType::Timestamp(
+                    
crate::metadata::datatype::TimestampType::try_with_nullable(true, precision)
+                        .map_err(|e| Error::JsonSerdeError {
+                            message: format!("Invalid TIMESTAMP precision: 
{}", e),
+                        })?,
+                )
             }
             "TIMESTAMP_WITH_LOCAL_TIME_ZONE" => {
                 let precision = node
                     .get(Self::FIELD_NAME_PRECISION)
                     .and_then(|v| v.as_u64())
                     .unwrap_or(6) as u32;
-                DataTypes::timestamp_ltz_with_precision(precision)
+                DataType::TimestampLTz(
+                    
crate::metadata::datatype::TimestampLTzType::try_with_nullable(true, precision)
+                        .map_err(|e| Error::JsonSerdeError {
+                            message: format!("Invalid TIMESTAMP_LTZ precision: 
{}", e),

Review Comment:
   Similarly, should this be saying `Invalid TIMESTAMP_WITH_LOCAL_TIME_ZONE 
precision` instead? 



##########
crates/fluss/src/metadata/json_serde.rs:
##########
@@ -202,29 +202,51 @@ impl JsonSerde for DataType {
                     .get(Self::FIELD_NAME_SCALE)
                     .and_then(|v| v.as_u64())
                     .unwrap_or(0) as u32;
-                DataTypes::decimal(precision, scale)
+                DataType::Decimal(
+                    crate::metadata::datatype::DecimalType::try_with_nullable(
+                        true, precision, scale,
+                    )
+                    .map_err(|e| Error::JsonSerdeError {
+                        message: format!("Invalid DECIMAL parameters: {}", e),
+                    })?,
+                )
             }
             "DATE" => DataTypes::date(),
             "TIME_WITHOUT_TIME_ZONE" => {
                 let precision = node
                     .get(Self::FIELD_NAME_PRECISION)
                     .and_then(|v| v.as_u64())
                     .unwrap_or(0) as u32;
-                DataTypes::time_with_precision(precision)
+                DataType::Time(
+                    
crate::metadata::datatype::TimeType::try_with_nullable(true, precision)
+                        .map_err(|e| Error::JsonSerdeError {
+                            message: format!("Invalid TIME precision: {}", e),
+                        })?,
+                )
             }
             "TIMESTAMP_WITHOUT_TIME_ZONE" => {
                 let precision = node
                     .get(Self::FIELD_NAME_PRECISION)
                     .and_then(|v| v.as_u64())
                     .unwrap_or(6) as u32;
-                DataTypes::timestamp_with_precision(precision)
+                DataType::Timestamp(
+                    
crate::metadata::datatype::TimestampType::try_with_nullable(true, precision)
+                        .map_err(|e| Error::JsonSerdeError {
+                            message: format!("Invalid TIMESTAMP precision: 
{}", e),

Review Comment:
   Similarly, should this be saying `Invalid TIMESTAMP_WITHOUT_TIME_ZONE 
precision` instead? 



##########
crates/fluss/src/row/decimal.rs:
##########
@@ -0,0 +1,462 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::{Error, Result};
+use bigdecimal::num_bigint::BigInt;
+use bigdecimal::num_traits::Zero;
+use bigdecimal::{BigDecimal, RoundingMode};
+use std::fmt;
+
+#[cfg(test)]
+use std::str::FromStr;
+
+/// Maximum decimal precision that can be stored compactly as a single i64.
+/// Values with precision > MAX_COMPACT_PRECISION require byte array storage.
+pub const MAX_COMPACT_PRECISION: u32 = 18;
+
+/// An internal data structure representing a decimal value with fixed 
precision and scale.
+///
+/// This data structure is immutable and stores decimal values in a compact 
representation
+/// (as a long value) if values are small enough (precision ≤ 18).
+///
+/// Matches Java's org.apache.fluss.row.Decimal class.
+#[derive(Debug, Clone, serde::Serialize)]
+pub struct Decimal {
+    precision: u32,
+    scale: u32,
+    // If precision <= MAX_COMPACT_PRECISION, this holds the unscaled value
+    long_val: Option<i64>,
+    // BigDecimal representation (may be cached)
+    decimal_val: Option<BigDecimal>,
+}
+
+impl Decimal {
+    /// Returns the precision of this Decimal.
+    ///
+    /// The precision is the number of digits in the unscaled value.
+    pub fn precision(&self) -> u32 {
+        self.precision
+    }
+
+    /// Returns the scale of this Decimal.
+    pub fn scale(&self) -> u32 {
+        self.scale
+    }
+
+    /// Returns whether the decimal value is small enough to be stored in a 
long.
+    pub fn is_compact(&self) -> bool {
+        self.precision <= MAX_COMPACT_PRECISION
+    }
+
+    /// Returns whether a given precision can be stored compactly.
+    pub fn is_compact_precision(precision: u32) -> bool {
+        precision <= MAX_COMPACT_PRECISION
+    }
+
+    /// Converts this Decimal into a BigDecimal.
+    pub fn to_big_decimal(&self) -> BigDecimal {
+        if let Some(bd) = &self.decimal_val {
+            bd.clone()
+        } else if let Some(long_val) = self.long_val {
+            BigDecimal::new(BigInt::from(long_val), self.scale as i64)
+        } else {
+            // Should never happen - we always have one representation
+            BigDecimal::new(BigInt::from(0), self.scale as i64)
+        }
+    }
+
+    /// Returns a long describing the unscaled value of this Decimal.
+    pub fn to_unscaled_long(&self) -> Result<i64> {
+        if let Some(long_val) = self.long_val {
+            Ok(long_val)
+        } else {
+            // Extract unscaled value from BigDecimal
+            let bd = self.to_big_decimal();
+            let (unscaled, _) = bd.as_bigint_and_exponent();
+            unscaled.try_into().map_err(|_| Error::IllegalArgument {
+                message: format!(
+                    "Decimal unscaled value does not fit in i64: precision={}",
+                    self.precision
+                ),
+            })
+        }
+    }
+
+    /// Returns a byte array describing the unscaled value of this Decimal.
+    pub fn to_unscaled_bytes(&self) -> Vec<u8> {
+        let bd = self.to_big_decimal();
+        let (unscaled, _) = bd.as_bigint_and_exponent();
+        unscaled.to_signed_bytes_be()
+    }
+
+    /// Creates a Decimal from Arrow's Decimal128 representation.
+    // TODO: For compact decimals with matching scale we may call 
from_unscaled_long
+    pub fn from_arrow_decimal128(
+        i128_val: i128,
+        arrow_scale: i64,
+        precision: u32,
+        scale: u32,
+    ) -> Result<Self> {
+        let bd = BigDecimal::new(BigInt::from(i128_val), arrow_scale);
+        Self::from_big_decimal(bd, precision, scale)
+    }
+
+    /// Creates an instance of Decimal from a BigDecimal with the given 
precision and scale.
+    ///
+    /// The returned decimal value may be rounded to have the desired scale. 
The precision
+    /// will be checked. If the precision overflows, an error is returned.
+    pub fn from_big_decimal(bd: BigDecimal, precision: u32, scale: u32) -> 
Result<Self> {
+        // Rescale to the target scale with HALF_UP rounding (matches Java)
+        let scaled = bd.with_scale_round(scale as i64, RoundingMode::HalfUp);
+
+        // Extract unscaled value
+        let (unscaled, exp) = scaled.as_bigint_and_exponent();
+
+        // Sanity check that scale matches
+        debug_assert_eq!(
+            exp, scale as i64,
+            "Scaled decimal exponent ({}) != expected scale ({})",
+            exp, scale
+        );
+
+        let actual_precision = Self::compute_precision(&unscaled);
+        if actual_precision > precision as usize {
+            return Err(Error::IllegalArgument {
+                message: format!(
+                    "Decimal precision overflow: value has {} digits but 
precision is {} (value: {})",
+                    actual_precision, precision, scaled
+                ),
+            });
+        }
+
+        // Compute compact representation if possible
+        let long_val = if precision <= MAX_COMPACT_PRECISION {
+            Some(i64::try_from(&unscaled).map_err(|_| Error::IllegalArgument {
+                message: format!(
+                    "Decimal mantissa exceeds i64 range for compact precision 
{}: unscaled={} (value={})",
+                    precision, unscaled, scaled
+                ),
+            })?)
+        } else {
+            None
+        };
+
+        Ok(Decimal {
+            precision,
+            scale,
+            long_val,
+            decimal_val: Some(scaled),
+        })
+    }
+
+    /// Creates an instance of Decimal from an unscaled long value with the 
given precision and scale.
+    pub fn from_unscaled_long(unscaled_long: i64, precision: u32, scale: u32) 
-> Result<Self> {
+        if precision > MAX_COMPACT_PRECISION {
+            return Err(Error::IllegalArgument {
+                message: format!(
+                    "Precision {} exceeds MAX_COMPACT_PRECISION ({})",
+                    precision, MAX_COMPACT_PRECISION
+                ),
+            });
+        }
+
+        let actual_precision = 
Self::compute_precision(&BigInt::from(unscaled_long));
+        if actual_precision > precision as usize {
+            return Err(Error::IllegalArgument {
+                message: format!(
+                    "Decimal precision overflow: unscaled value has {} digits 
but precision is {}",
+                    actual_precision, precision
+                ),
+            });
+        }
+
+        Ok(Decimal {
+            precision,
+            scale,
+            long_val: Some(unscaled_long),
+            decimal_val: None,
+        })
+    }
+
+    /// Creates an instance of Decimal from an unscaled byte array with the 
given precision and scale.
+    pub fn from_unscaled_bytes(unscaled_bytes: &[u8], precision: u32, scale: 
u32) -> Result<Self> {
+        let unscaled = BigInt::from_signed_bytes_be(unscaled_bytes);
+        let bd = BigDecimal::new(unscaled, scale as i64);
+        Self::from_big_decimal(bd, precision, scale)
+    }
+
+    /// Computes the precision of a decimal's unscaled value, matching Java's 
BigDecimal.precision().
+    pub fn compute_precision(unscaled: &BigInt) -> usize {
+        if unscaled.is_zero() {
+            return 1;
+        }
+
+        // Count ALL digits in the unscaled value (matches Java's 
BigDecimal.precision())
+        // For bounded precision (≤ 38 digits), string conversion is cheap and 
simple.
+        unscaled.magnitude().to_str_radix(10).len()
+    }
+}
+
+impl fmt::Display for Decimal {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "{}", self.to_big_decimal())
+    }
+}
+
+// Manual implementations of comparison traits to ignore cached fields
+impl PartialEq for Decimal {
+    fn eq(&self, other: &Self) -> bool {
+        // Use numeric equality like Java's Decimal.equals() which delegates 
to compareTo.
+        // This means 1.0 (scale=1) equals 1.00 (scale=2).
+        self.cmp(other) == std::cmp::Ordering::Equal
+    }
+}
+
+impl Eq for Decimal {}
+
+impl PartialOrd for Decimal {
+    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+impl Ord for Decimal {
+    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+        // If both are compact and have the same scale, compare directly
+        if self.is_compact() && other.is_compact() && self.scale == 
other.scale {
+            self.long_val.cmp(&other.long_val)
+        } else {
+            // Otherwise, compare as BigDecimal
+            self.to_big_decimal().cmp(&other.to_big_decimal())
+        }
+    }
+}
+
+impl std::hash::Hash for Decimal {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        // Hash the BigDecimal representation.
+        //
+        // IMPORTANT: Unlike Java's BigDecimal, Rust's bigdecimal crate 
normalizes
+        // before hashing, so hash(1.0) == hash(1.00). Combined with our 
numeric
+        // equality (1.0 == 1.00), this CORRECTLY satisfies the hash/equals 
contract.
+        //
+        // This is BETTER than Java's implementation which has a hash/equals 
violation:
+        // - Java: equals(1.0, 1.00) = true, but hashCode(1.0) != 
hashCode(1.00)
+        // - Rust: equals(1.0, 1.00) = true, and hash(1.0) == hash(1.00) ✓
+        //
+        // Result: HashMap/HashSet will work correctly even if you create 
Decimals
+        // with different scales for the same numeric value (though this is 
rare in
+        // practice since decimals are schema-driven with fixed 
precision/scale).
+        self.to_big_decimal().hash(state);
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_precision_calculation() {
+        // Zero is special case
+        assert_eq!(Decimal::compute_precision(&BigInt::from(0)), 1);
+
+        // Must count ALL digits including trailing zeros (matches Java 
BigDecimal.precision())
+        assert_eq!(Decimal::compute_precision(&BigInt::from(10)), 2);
+        assert_eq!(Decimal::compute_precision(&BigInt::from(100)), 3);
+        assert_eq!(Decimal::compute_precision(&BigInt::from(12300)), 5);
+        assert_eq!(
+            Decimal::compute_precision(&BigInt::from(10000000000i64)),
+            11
+        );
+
+        // Test the case: value=1, scale=10 → unscaled=10000000000 (11 digits)
+        let bd = BigDecimal::new(BigInt::from(1), 0);
+        assert!(
+            Decimal::from_big_decimal(bd.clone(), 1, 10).is_err(),
+            "Should reject: unscaled 10000000000 has 11 digits, precision=1 is 
too small"
+        );
+        assert!(
+            Decimal::from_big_decimal(bd, 11, 10).is_ok(),
+            "Should accept with correct precision=11"
+        );
+    }
+
+    /// Test precision validation boundaries
+    #[test]
+    fn test_precision_validation() {
+        let test_cases = vec![
+            (10i64, 1, 2),            // 1.0 → unscaled: 10 (2 digits)
+            (100i64, 2, 3),           // 1.00 → unscaled: 100 (3 digits)
+            (10000000000i64, 10, 11), // 1.0000000000 → unscaled: 10000000000 
(11 digits)
+        ];
+
+        for (unscaled, scale, min_precision) in test_cases {
+            let bd = BigDecimal::new(BigInt::from(unscaled), scale as i64);
+
+            // Reject if precision too small
+            assert!(Decimal::from_big_decimal(bd.clone(), min_precision - 1, 
scale).is_err());
+            // Accept with correct precision
+            assert!(Decimal::from_big_decimal(bd, min_precision, 
scale).is_ok());
+        }
+
+        // i64::MAX has 19 digits, should reject with precision=5
+        let bd = BigDecimal::new(BigInt::from(i64::MAX), 0);
+        assert!(Decimal::from_big_decimal(bd, 5, 0).is_err());
+    }
+
+    /// Test creation and basic operations for both compact and non-compact 
decimals
+    #[test]
+    fn test_creation_and_representation() {
+        // Compact (precision ≤ 18): from unscaled long
+        let compact = Decimal::from_unscaled_long(12345, 10, 2).unwrap();
+        assert_eq!(compact.precision(), 10);
+        assert_eq!(compact.scale(), 2);
+        assert!(compact.is_compact());
+        assert_eq!(compact.to_unscaled_long().unwrap(), 12345);
+        assert_eq!(compact.to_big_decimal().to_string(), "123.45");
+
+        // Non-compact (precision > 18): from BigDecimal
+        let bd = BigDecimal::new(BigInt::from(12345), 0);
+        let non_compact = Decimal::from_big_decimal(bd, 28, 0).unwrap();
+        assert_eq!(non_compact.precision(), 28);
+        assert!(!non_compact.is_compact());
+        assert_eq!(
+            non_compact.to_unscaled_bytes(),
+            BigInt::from(12345).to_signed_bytes_be()
+        );
+
+        // Test compact boundary
+        assert!(Decimal::is_compact_precision(18));
+        assert!(!Decimal::is_compact_precision(19));
+
+        // Test rounding during creation
+        let bd = BigDecimal::new(BigInt::from(12345), 3); // 12.345
+        let rounded = Decimal::from_big_decimal(bd, 10, 2).unwrap();
+        assert_eq!(rounded.to_unscaled_long().unwrap(), 1235); // 12.35
+    }
+
+    /// Test serialization round-trip (unscaled bytes)
+    #[test]
+    fn test_serialization_roundtrip() {
+        // Compact decimal
+        let bd1 = BigDecimal::new(BigInt::from(1314567890123i64), 5); // 
13145678.90123
+        let decimal1 = Decimal::from_big_decimal(bd1.clone(), 15, 5).unwrap();
+        let (unscaled1, _) = bd1.as_bigint_and_exponent();
+        let from_bytes1 =
+            Decimal::from_unscaled_bytes(&unscaled1.to_signed_bytes_be(), 15, 
5).unwrap();
+        assert_eq!(from_bytes1, decimal1);
+        assert_eq!(
+            from_bytes1.to_unscaled_bytes(),
+            unscaled1.to_signed_bytes_be()
+        );
+
+        // Non-compact decimal
+        let bd2 = BigDecimal::new(BigInt::from(12345678900987654321i128), 10);
+        let decimal2 = Decimal::from_big_decimal(bd2.clone(), 23, 10).unwrap();
+        let (unscaled2, _) = bd2.as_bigint_and_exponent();
+        let from_bytes2 =
+            Decimal::from_unscaled_bytes(&unscaled2.to_signed_bytes_be(), 23, 
10).unwrap();
+        assert_eq!(from_bytes2, decimal2);
+        assert_eq!(
+            from_bytes2.to_unscaled_bytes(),
+            unscaled2.to_signed_bytes_be()
+        );
+    }
+
+    /// Test numeric equality and ordering (matches Java semantics)
+    #[test]
+    fn test_equality_and_ordering() {

Review Comment:
   nit: UT should also include negative value when testing ordering 



##########
crates/fluss/src/metadata/json_serde.rs:
##########
@@ -202,29 +202,51 @@ impl JsonSerde for DataType {
                     .get(Self::FIELD_NAME_SCALE)
                     .and_then(|v| v.as_u64())
                     .unwrap_or(0) as u32;
-                DataTypes::decimal(precision, scale)
+                DataType::Decimal(
+                    crate::metadata::datatype::DecimalType::try_with_nullable(
+                        true, precision, scale,
+                    )
+                    .map_err(|e| Error::JsonSerdeError {
+                        message: format!("Invalid DECIMAL parameters: {}", e),
+                    })?,
+                )
             }
             "DATE" => DataTypes::date(),
             "TIME_WITHOUT_TIME_ZONE" => {
                 let precision = node
                     .get(Self::FIELD_NAME_PRECISION)
                     .and_then(|v| v.as_u64())
                     .unwrap_or(0) as u32;
-                DataTypes::time_with_precision(precision)
+                DataType::Time(
+                    
crate::metadata::datatype::TimeType::try_with_nullable(true, precision)
+                        .map_err(|e| Error::JsonSerdeError {
+                            message: format!("Invalid TIME precision: {}", e),

Review Comment:
   Nit, for the purpose of easier debugging and as we're matching Json field 
value of `TIME_WITHOUT_TIME_ZONE`, should this be saying `Invalid 
TIME_WITHOUT_TIME_ZONE precision` instead? 



##########
crates/fluss/src/record/arrow.rs:
##########
@@ -1162,21 +1163,68 @@ mod tests {
     }
 
     #[test]
-    #[should_panic(expected = "Invalid precision value for TimeType: 10")]
+    fn test_decimal_invalid_precision() {

Review Comment:
   Should we have these in `datatype.rs`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to