leekeiabstraction commented on code in PR #175:
URL: https://github.com/apache/fluss-rust/pull/175#discussion_r2702206726


##########
crates/fluss/src/row/binary/binary_writer.rs:
##########
@@ -30,36 +30,48 @@ pub trait BinaryWriter {
     /// Set null to this field
     fn set_null_at(&mut self, pos: usize);
 
-    fn write_boolean(&mut self, value: bool);
+    fn write_boolean(&mut self, value: bool) -> Result<()>;
 
-    fn write_byte(&mut self, value: u8);
+    fn write_byte(&mut self, value: u8) -> Result<()>;
 
-    fn write_bytes(&mut self, value: &[u8]);
+    fn write_bytes(&mut self, value: &[u8]) -> Result<()>;
 
-    fn write_char(&mut self, value: &str, length: usize);
+    fn write_char(&mut self, value: &str, length: usize) -> Result<()>;
 
-    fn write_string(&mut self, value: &str);
+    fn write_string(&mut self, value: &str) -> Result<()>;
 
-    fn write_short(&mut self, value: i16);
+    fn write_short(&mut self, value: i16) -> Result<()>;
 
-    fn write_int(&mut self, value: i32);
+    fn write_int(&mut self, value: i32) -> Result<()>;
 
-    fn write_long(&mut self, value: i64);
+    fn write_long(&mut self, value: i64) -> Result<()>;
 
-    fn write_float(&mut self, value: f32);
+    fn write_float(&mut self, value: f32) -> Result<()>;
 
-    fn write_double(&mut self, value: f64);
+    fn write_double(&mut self, value: f64) -> Result<()>;
 
-    fn write_binary(&mut self, bytes: &[u8], length: usize);
+    fn write_binary(&mut self, bytes: &[u8], length: usize) -> Result<()>;
 
-    // TODO Decimal type
-    // fn write_decimal(&mut self, pos: i32, value: f64);
+    fn write_decimal(
+        &mut self,
+        value: &bigdecimal::BigDecimal,
+        precision: u32,
+        scale: u32,
+    ) -> Result<()>;
 
-    // TODO Timestamp type
-    // fn write_timestamp_ntz(&mut self, pos: i32, value: i64);
+    fn write_time(&mut self, value: i32, precision: u32) -> Result<()>;
 
-    // TODO Timestamp type
-    // fn write_timestamp_ltz(&mut self, pos: i32, value: i64);
+    fn write_timestamp_ntz(
+        &mut self,
+        value: &crate::row::datum::Timestamp,

Review Comment:
   Let's call this struct TimestampNtz to be consistent with Java side.



##########
crates/fluss/src/row/binary/binary_writer.rs:
##########
@@ -147,6 +164,64 @@ impl InnerValueWriter {
             DataType::BigInt(_) => Ok(InnerValueWriter::BigInt),
             DataType::Float(_) => Ok(InnerValueWriter::Float),
             DataType::Double(_) => Ok(InnerValueWriter::Double),
+            DataType::Decimal(d) => {
+                let precision = d.precision();
+                let scale = d.scale();
+                if !(1..=38).contains(&precision) {
+                    return Err(IllegalArgument {
+                        message: format!(
+                            "Decimal precision must be between 1 and 38 (both 
inclusive), got: {}",
+                            precision
+                        ),
+                    });
+                }
+                if scale > precision {
+                    return Err(IllegalArgument {
+                        message: format!(
+                            "Decimal scale must be between 0 and the precision 
{} (both inclusive), got: {}",
+                            precision, scale
+                        ),
+                    });
+                }

Review Comment:
   Suggest that we shift left on these and Err on DataType instantiation instead



##########
crates/fluss/src/row/compacted/compacted_row_writer.rs:
##########
@@ -89,64 +108,250 @@ impl BinaryWriter for CompactedRowWriter {
         self.buffer[byte_index] |= 1u8 << bit;
     }
 
-    fn write_boolean(&mut self, value: bool) {
+    fn write_boolean(&mut self, value: bool) -> Result<()> {
         let b = if value { 1u8 } else { 0u8 };
-        self.write_raw(&[b]);
+        self.write_raw(&[b])
     }
 
-    fn write_byte(&mut self, value: u8) {
-        self.write_raw(&[value]);
+    fn write_byte(&mut self, value: u8) -> Result<()> {
+        self.write_raw(&[value])
     }
 
-    fn write_bytes(&mut self, value: &[u8]) {
-        let len_i32 =
-            i32::try_from(value.len()).expect("byte slice too large to encode 
length as i32");
-        self.write_int(len_i32);
-        self.write_raw(value);
+    fn write_bytes(&mut self, value: &[u8]) -> Result<()> {
+        let len_i32 = i32::try_from(value.len()).map_err(|_| 
Error::IllegalArgument {
+            message: format!(
+                "Byte slice too large to encode length as i32: {} bytes 
exceeds i32::MAX",
+                value.len()
+            ),
+        })?;
+        self.write_int(len_i32)?;
+        self.write_raw(value)
     }
 
-    fn write_char(&mut self, value: &str, _length: usize) {
+    fn write_char(&mut self, value: &str, _length: usize) -> Result<()> {
         // TODO: currently, we encoding CHAR(length) as the same with STRING, 
the length info can be
         //  omitted and the bytes length should be enforced in the future.
-        self.write_string(value);
+        self.write_string(value)
     }
 
-    fn write_string(&mut self, value: &str) {
-        self.write_bytes(value.as_ref());
+    fn write_string(&mut self, value: &str) -> Result<()> {
+        self.write_bytes(value.as_ref())
     }
 
-    fn write_short(&mut self, value: i16) {
-        self.write_raw(&value.to_ne_bytes());
+    fn write_short(&mut self, value: i16) -> Result<()> {
+        // Use native endianness to match Java's UnsafeUtils.putShort behavior
+        // Java uses sun.misc.Unsafe which writes in native byte order 
(typically LE on x86/ARM)
+        self.write_raw(&value.to_ne_bytes())
     }
 
-    fn write_int(&mut self, value: i32) {
+    fn write_int(&mut self, value: i32) -> Result<()> {
         self.ensure_capacity(Self::MAX_INT_SIZE);
         let bytes_written =
             write_unsigned_varint_to_slice(value as u32, &mut 
self.buffer[self.position..]);
         self.position += bytes_written;
+        Ok(())
     }
 
-    fn write_long(&mut self, value: i64) {
+    fn write_long(&mut self, value: i64) -> Result<()> {
         self.ensure_capacity(Self::MAX_LONG_SIZE);
         let bytes_written =
             write_unsigned_varint_u64_to_slice(value as u64, &mut 
self.buffer[self.position..]);
         self.position += bytes_written;
+        Ok(())
     }
-    fn write_float(&mut self, value: f32) {
-        self.write_raw(&value.to_ne_bytes());
+
+    fn write_float(&mut self, value: f32) -> Result<()> {
+        // Use native endianness to match Java's UnsafeUtils.putFloat behavior
+        self.write_raw(&value.to_ne_bytes())
     }
 
-    fn write_double(&mut self, value: f64) {
-        self.write_raw(&value.to_ne_bytes());
+    fn write_double(&mut self, value: f64) -> Result<()> {
+        // Use native endianness to match Java's UnsafeUtils.putDouble behavior
+        self.write_raw(&value.to_ne_bytes())
     }
 
-    fn write_binary(&mut self, bytes: &[u8], length: usize) {
+    fn write_binary(&mut self, bytes: &[u8], length: usize) -> Result<()> {
         // TODO: currently, we encoding BINARY(length) as the same with BYTES, 
the length info can
         //  be omitted and the bytes length should be enforced in the future.
-        self.write_bytes(&bytes[..length.min(bytes.len())]);
+        self.write_bytes(&bytes[..length.min(bytes.len())])
     }
 
     fn complete(&mut self) {
         // do nothing
     }
+
+    fn write_decimal(
+        &mut self,
+        value: &bigdecimal::BigDecimal,
+        precision: u32,
+        scale: u32,
+    ) -> Result<()> {
+        const MAX_COMPACT_PRECISION: u32 = 18;
+        use bigdecimal::rounding::RoundingMode;
+
+        // Step 1 (Java: Decimal.fromBigDecimal): rescale + validate precision
+        // bd = bd.setScale(scale, RoundingMode.HALF_UP);
+        let scaled = value.with_scale_round(scale as i64, 
RoundingMode::HalfUp);
+
+        // In bigdecimal, after scaling, the "unscaled value" is exactly the 
bigint mantissa.
+        // That corresponds to Java: 
scaled.movePointRight(scale).toBigIntegerExact().
+        let (unscaled, exp) = scaled.as_bigint_and_exponent();
+
+        // Sanity check
+        debug_assert_eq!(
+            exp, scale as i64,
+            "Scaled decimal exponent ({}) != expected scale ({})",
+            exp, scale
+        );
+
+        // Java validates: if (bd.precision() > precision) return null;
+        // Java BigDecimal.precision() == number of base-10 digits in the 
unscaled value (sign ignored),
+        // with 0 having precision 1, and trailing zeros stripped.
+        let prec = java_decimal_precision(&unscaled);
+        if prec > precision as usize {
+            return Err(Error::IllegalArgument {
+                message: format!(
+                    "Decimal precision overflow after rescale: scaled={} has 
{} digits but precision is {} (orig={})",
+                    scaled, prec, precision, value
+                ),
+            });
+        }
+
+        // Step 2 (Java: CompactedRowWriter.writeDecimal): serialize 
precomputed unscaled representation
+        if precision <= MAX_COMPACT_PRECISION {
+            let unscaled_i64 = i64::try_from(&unscaled).map_err(|_| 
Error::IllegalArgument {
+                message: format!(
+                    "Decimal mantissa exceeds i64 range for compact precision 
{}: unscaled={} (scaled={}, orig={})",
+                    precision, unscaled, scaled, value
+                ),
+            })?;
+            self.write_long(unscaled_i64)
+        } else {
+            // Java: BigInteger.toByteArray() (two's complement big-endian, 
minimal length)
+            let bytes = unscaled.to_signed_bytes_be();
+            self.write_bytes(&bytes)
+        }
+    }
+
+    fn write_time(&mut self, value: i32, _precision: u32) -> Result<()> {
+        // TIME is always encoded as i32 (milliseconds since midnight) 
regardless of precision
+        self.write_int(value)
+    }
+
+    fn write_timestamp_ntz(
+        &mut self,
+        value: &crate::row::datum::Timestamp,
+        precision: u32,
+    ) -> Result<()> {
+        if crate::row::datum::Timestamp::is_compact(precision) {
+            // Compact: write only milliseconds
+            self.write_long(value.get_millisecond())?;
+        } else {
+            // Non-compact: write milliseconds + nanoOfMillisecond
+            self.write_long(value.get_millisecond())?;
+            self.write_int(value.get_nano_of_millisecond())?;
+        }
+        Ok(())
+    }
+
+    fn write_timestamp_ltz(
+        &mut self,
+        value: &crate::row::datum::TimestampLtz,
+        precision: u32,
+    ) -> Result<()> {
+        if crate::row::datum::TimestampLtz::is_compact(precision) {
+            // Compact: write only epoch milliseconds
+            self.write_long(value.get_epoch_millisecond())?;
+        } else {
+            // Non-compact: write epoch milliseconds + nanoOfMillisecond
+            self.write_long(value.get_epoch_millisecond())?;
+            self.write_int(value.get_nano_of_millisecond())?;

Review Comment:
   @luoyuxia Might need your help in reviewing this.
   
   Rust side implementation looks correct but Java side does 
`setOffsetAndSize(pos, cursor, value.getNanoOfMillisecond());`
   
   The Java implementation of `setOffSetAndSize()` is as follow. 
   
   ```
       public void setOffsetAndSize(int pos, int offset, long size) {
           final long offsetAndSize = ((long) offset << 32) | size;
           segment.putLong(getElementOffset(pos, 8), offsetAndSize);
       }
   ```
   
   Do we need to replicate the offset writing behaviour in Rust side?



##########
crates/fluss/src/row/binary/binary_writer.rs:
##########
@@ -30,36 +30,48 @@ pub trait BinaryWriter {
     /// Set null to this field
     fn set_null_at(&mut self, pos: usize);
 
-    fn write_boolean(&mut self, value: bool);
+    fn write_boolean(&mut self, value: bool) -> Result<()>;
 
-    fn write_byte(&mut self, value: u8);
+    fn write_byte(&mut self, value: u8) -> Result<()>;
 
-    fn write_bytes(&mut self, value: &[u8]);
+    fn write_bytes(&mut self, value: &[u8]) -> Result<()>;
 
-    fn write_char(&mut self, value: &str, length: usize);
+    fn write_char(&mut self, value: &str, length: usize) -> Result<()>;
 
-    fn write_string(&mut self, value: &str);
+    fn write_string(&mut self, value: &str) -> Result<()>;
 
-    fn write_short(&mut self, value: i16);
+    fn write_short(&mut self, value: i16) -> Result<()>;
 
-    fn write_int(&mut self, value: i32);
+    fn write_int(&mut self, value: i32) -> Result<()>;
 
-    fn write_long(&mut self, value: i64);
+    fn write_long(&mut self, value: i64) -> Result<()>;
 
-    fn write_float(&mut self, value: f32);
+    fn write_float(&mut self, value: f32) -> Result<()>;
 
-    fn write_double(&mut self, value: f64);
+    fn write_double(&mut self, value: f64) -> Result<()>;
 
-    fn write_binary(&mut self, bytes: &[u8], length: usize);
+    fn write_binary(&mut self, bytes: &[u8], length: usize) -> Result<()>;
 
-    // TODO Decimal type
-    // fn write_decimal(&mut self, pos: i32, value: f64);
+    fn write_decimal(
+        &mut self,
+        value: &bigdecimal::BigDecimal,
+        precision: u32,
+        scale: u32,
+    ) -> Result<()>;
 
-    // TODO Timestamp type
-    // fn write_timestamp_ntz(&mut self, pos: i32, value: i64);
+    fn write_time(&mut self, value: i32, precision: u32) -> Result<()>;

Review Comment:
   QQ: There's no writeTime() on Java side, why do we have this on Rust side?



##########
crates/fluss/src/row/binary/binary_writer.rs:
##########
@@ -30,36 +30,48 @@ pub trait BinaryWriter {
     /// Set null to this field
     fn set_null_at(&mut self, pos: usize);
 
-    fn write_boolean(&mut self, value: bool);
+    fn write_boolean(&mut self, value: bool) -> Result<()>;
 
-    fn write_byte(&mut self, value: u8);
+    fn write_byte(&mut self, value: u8) -> Result<()>;
 
-    fn write_bytes(&mut self, value: &[u8]);
+    fn write_bytes(&mut self, value: &[u8]) -> Result<()>;
 
-    fn write_char(&mut self, value: &str, length: usize);
+    fn write_char(&mut self, value: &str, length: usize) -> Result<()>;
 
-    fn write_string(&mut self, value: &str);
+    fn write_string(&mut self, value: &str) -> Result<()>;
 
-    fn write_short(&mut self, value: i16);
+    fn write_short(&mut self, value: i16) -> Result<()>;
 
-    fn write_int(&mut self, value: i32);
+    fn write_int(&mut self, value: i32) -> Result<()>;
 
-    fn write_long(&mut self, value: i64);
+    fn write_long(&mut self, value: i64) -> Result<()>;
 
-    fn write_float(&mut self, value: f32);
+    fn write_float(&mut self, value: f32) -> Result<()>;
 
-    fn write_double(&mut self, value: f64);
+    fn write_double(&mut self, value: f64) -> Result<()>;
 
-    fn write_binary(&mut self, bytes: &[u8], length: usize);
+    fn write_binary(&mut self, bytes: &[u8], length: usize) -> Result<()>;
 
-    // TODO Decimal type
-    // fn write_decimal(&mut self, pos: i32, value: f64);
+    fn write_decimal(
+        &mut self,
+        value: &bigdecimal::BigDecimal,
+        precision: u32,
+        scale: u32,

Review Comment:
   QQ: Why do we have scale here? Should we have it consistent with Java side 
where there's only precision?
   
   FWIW, I also see the value of what you're doing where if we specify 
precision, why not scale as well?



##########
crates/fluss/src/row/binary/binary_writer.rs:
##########
@@ -147,6 +164,64 @@ impl InnerValueWriter {
             DataType::BigInt(_) => Ok(InnerValueWriter::BigInt),
             DataType::Float(_) => Ok(InnerValueWriter::Float),
             DataType::Double(_) => Ok(InnerValueWriter::Double),
+            DataType::Decimal(d) => {
+                let precision = d.precision();
+                let scale = d.scale();
+                if !(1..=38).contains(&precision) {
+                    return Err(IllegalArgument {
+                        message: format!(
+                            "Decimal precision must be between 1 and 38 (both 
inclusive), got: {}",
+                            precision
+                        ),
+                    });
+                }
+                if scale > precision {
+                    return Err(IllegalArgument {
+                        message: format!(
+                            "Decimal scale must be between 0 and the precision 
{} (both inclusive), got: {}",
+                            precision, scale
+                        ),
+                    });
+                }
+                Ok(InnerValueWriter::Decimal(precision, scale))
+            }
+            DataType::Date(_) => Ok(InnerValueWriter::Date),
+            DataType::Time(t) => {
+                let precision = t.precision();
+                if precision > 9 {
+                    return Err(IllegalArgument {
+                        message: format!(
+                            "Time precision must be between 0 and 9 (both 
inclusive), got: {}",
+                            precision
+                        ),
+                    });
+                }

Review Comment:
   Suggest that we shift left on this and Err on DataType instantiation instead



##########
crates/fluss/src/row/binary/binary_writer.rs:
##########
@@ -147,6 +164,64 @@ impl InnerValueWriter {
             DataType::BigInt(_) => Ok(InnerValueWriter::BigInt),
             DataType::Float(_) => Ok(InnerValueWriter::Float),
             DataType::Double(_) => Ok(InnerValueWriter::Double),
+            DataType::Decimal(d) => {
+                let precision = d.precision();
+                let scale = d.scale();
+                if !(1..=38).contains(&precision) {
+                    return Err(IllegalArgument {
+                        message: format!(
+                            "Decimal precision must be between 1 and 38 (both 
inclusive), got: {}",
+                            precision
+                        ),
+                    });
+                }
+                if scale > precision {
+                    return Err(IllegalArgument {
+                        message: format!(
+                            "Decimal scale must be between 0 and the precision 
{} (both inclusive), got: {}",
+                            precision, scale
+                        ),
+                    });
+                }
+                Ok(InnerValueWriter::Decimal(precision, scale))
+            }
+            DataType::Date(_) => Ok(InnerValueWriter::Date),
+            DataType::Time(t) => {
+                let precision = t.precision();
+                if precision > 9 {
+                    return Err(IllegalArgument {
+                        message: format!(
+                            "Time precision must be between 0 and 9 (both 
inclusive), got: {}",
+                            precision
+                        ),
+                    });
+                }
+                Ok(InnerValueWriter::Time(precision))
+            }
+            DataType::Timestamp(t) => {
+                let precision = t.precision();
+                if precision > 9 {
+                    return Err(IllegalArgument {
+                        message: format!(
+                            "Timestamp precision must be between 0 and 9 (both 
inclusive), got: {}",
+                            precision
+                        ),
+                    });
+                }

Review Comment:
   Suggest that we shift left on this and Err on DataType instantiation instead



##########
crates/fluss/src/row/binary/binary_writer.rs:
##########
@@ -147,6 +164,64 @@ impl InnerValueWriter {
             DataType::BigInt(_) => Ok(InnerValueWriter::BigInt),
             DataType::Float(_) => Ok(InnerValueWriter::Float),
             DataType::Double(_) => Ok(InnerValueWriter::Double),
+            DataType::Decimal(d) => {
+                let precision = d.precision();
+                let scale = d.scale();
+                if !(1..=38).contains(&precision) {
+                    return Err(IllegalArgument {
+                        message: format!(
+                            "Decimal precision must be between 1 and 38 (both 
inclusive), got: {}",
+                            precision
+                        ),
+                    });
+                }
+                if scale > precision {
+                    return Err(IllegalArgument {
+                        message: format!(
+                            "Decimal scale must be between 0 and the precision 
{} (both inclusive), got: {}",
+                            precision, scale
+                        ),
+                    });
+                }
+                Ok(InnerValueWriter::Decimal(precision, scale))
+            }
+            DataType::Date(_) => Ok(InnerValueWriter::Date),
+            DataType::Time(t) => {
+                let precision = t.precision();
+                if precision > 9 {
+                    return Err(IllegalArgument {
+                        message: format!(
+                            "Time precision must be between 0 and 9 (both 
inclusive), got: {}",
+                            precision
+                        ),
+                    });
+                }
+                Ok(InnerValueWriter::Time(precision))
+            }
+            DataType::Timestamp(t) => {
+                let precision = t.precision();
+                if precision > 9 {
+                    return Err(IllegalArgument {
+                        message: format!(
+                            "Timestamp precision must be between 0 and 9 (both 
inclusive), got: {}",
+                            precision
+                        ),
+                    });
+                }
+                Ok(InnerValueWriter::TimestampNtz(precision))
+            }
+            DataType::TimestampLTz(t) => {
+                let precision = t.precision();
+                if precision > 9 {
+                    return Err(IllegalArgument {
+                        message: format!(
+                            "Timestamp with local time zone precision must be 
between 0 and 9 (both inclusive), got: {}",
+                            precision
+                        ),
+                    });
+                }

Review Comment:
   Suggest that we shift left on this and Err on DataType instantiation instead



##########
crates/fluss/src/row/compacted/compacted_row_reader.rs:
##########
@@ -97,12 +99,92 @@ impl<'a> CompactedRowDeserializer<'a> {
                     let (val, next) = reader.read_bytes(cursor);
                     (Datum::Blob(val.into()), next)
                 }
-                _ => panic!("unsupported DataType in 
CompactedRowDeserializer"),
+                DataType::Decimal(decimal_type) => {
+                    let precision = decimal_type.precision();
+                    let scale = decimal_type.scale();
+                    if precision <= 18 {

Review Comment:
   Magic number, should we define/use a const in Decimal data type instead? Or 
have a is_compact() function defined?



##########
crates/fluss/src/row/compacted/compacted_row_writer.rs:
##########
@@ -89,64 +108,250 @@ impl BinaryWriter for CompactedRowWriter {
         self.buffer[byte_index] |= 1u8 << bit;
     }
 
-    fn write_boolean(&mut self, value: bool) {
+    fn write_boolean(&mut self, value: bool) -> Result<()> {
         let b = if value { 1u8 } else { 0u8 };
-        self.write_raw(&[b]);
+        self.write_raw(&[b])
     }
 
-    fn write_byte(&mut self, value: u8) {
-        self.write_raw(&[value]);
+    fn write_byte(&mut self, value: u8) -> Result<()> {
+        self.write_raw(&[value])
     }
 
-    fn write_bytes(&mut self, value: &[u8]) {
-        let len_i32 =
-            i32::try_from(value.len()).expect("byte slice too large to encode 
length as i32");
-        self.write_int(len_i32);
-        self.write_raw(value);
+    fn write_bytes(&mut self, value: &[u8]) -> Result<()> {
+        let len_i32 = i32::try_from(value.len()).map_err(|_| 
Error::IllegalArgument {
+            message: format!(
+                "Byte slice too large to encode length as i32: {} bytes 
exceeds i32::MAX",
+                value.len()
+            ),
+        })?;
+        self.write_int(len_i32)?;
+        self.write_raw(value)
     }
 
-    fn write_char(&mut self, value: &str, _length: usize) {
+    fn write_char(&mut self, value: &str, _length: usize) -> Result<()> {
         // TODO: currently, we encoding CHAR(length) as the same with STRING, 
the length info can be
         //  omitted and the bytes length should be enforced in the future.
-        self.write_string(value);
+        self.write_string(value)
     }
 
-    fn write_string(&mut self, value: &str) {
-        self.write_bytes(value.as_ref());
+    fn write_string(&mut self, value: &str) -> Result<()> {
+        self.write_bytes(value.as_ref())
     }
 
-    fn write_short(&mut self, value: i16) {
-        self.write_raw(&value.to_ne_bytes());
+    fn write_short(&mut self, value: i16) -> Result<()> {
+        // Use native endianness to match Java's UnsafeUtils.putShort behavior
+        // Java uses sun.misc.Unsafe which writes in native byte order 
(typically LE on x86/ARM)
+        self.write_raw(&value.to_ne_bytes())
     }
 
-    fn write_int(&mut self, value: i32) {
+    fn write_int(&mut self, value: i32) -> Result<()> {
         self.ensure_capacity(Self::MAX_INT_SIZE);
         let bytes_written =
             write_unsigned_varint_to_slice(value as u32, &mut 
self.buffer[self.position..]);
         self.position += bytes_written;
+        Ok(())
     }
 
-    fn write_long(&mut self, value: i64) {
+    fn write_long(&mut self, value: i64) -> Result<()> {
         self.ensure_capacity(Self::MAX_LONG_SIZE);
         let bytes_written =
             write_unsigned_varint_u64_to_slice(value as u64, &mut 
self.buffer[self.position..]);
         self.position += bytes_written;
+        Ok(())
     }
-    fn write_float(&mut self, value: f32) {
-        self.write_raw(&value.to_ne_bytes());
+
+    fn write_float(&mut self, value: f32) -> Result<()> {
+        // Use native endianness to match Java's UnsafeUtils.putFloat behavior
+        self.write_raw(&value.to_ne_bytes())
     }
 
-    fn write_double(&mut self, value: f64) {
-        self.write_raw(&value.to_ne_bytes());
+    fn write_double(&mut self, value: f64) -> Result<()> {
+        // Use native endianness to match Java's UnsafeUtils.putDouble behavior
+        self.write_raw(&value.to_ne_bytes())
     }
 
-    fn write_binary(&mut self, bytes: &[u8], length: usize) {
+    fn write_binary(&mut self, bytes: &[u8], length: usize) -> Result<()> {
         // TODO: currently, we encoding BINARY(length) as the same with BYTES, 
the length info can
         //  be omitted and the bytes length should be enforced in the future.
-        self.write_bytes(&bytes[..length.min(bytes.len())]);
+        self.write_bytes(&bytes[..length.min(bytes.len())])
     }
 
     fn complete(&mut self) {
         // do nothing
     }
+
+    fn write_decimal(
+        &mut self,
+        value: &bigdecimal::BigDecimal,
+        precision: u32,
+        scale: u32,
+    ) -> Result<()> {
+        const MAX_COMPACT_PRECISION: u32 = 18;
+        use bigdecimal::rounding::RoundingMode;
+
+        // Step 1 (Java: Decimal.fromBigDecimal): rescale + validate precision
+        // bd = bd.setScale(scale, RoundingMode.HALF_UP);
+        let scaled = value.with_scale_round(scale as i64, 
RoundingMode::HalfUp);
+
+        // In bigdecimal, after scaling, the "unscaled value" is exactly the 
bigint mantissa.
+        // That corresponds to Java: 
scaled.movePointRight(scale).toBigIntegerExact().
+        let (unscaled, exp) = scaled.as_bigint_and_exponent();
+
+        // Sanity check
+        debug_assert_eq!(
+            exp, scale as i64,
+            "Scaled decimal exponent ({}) != expected scale ({})",
+            exp, scale
+        );
+
+        // Java validates: if (bd.precision() > precision) return null;
+        // Java BigDecimal.precision() == number of base-10 digits in the 
unscaled value (sign ignored),
+        // with 0 having precision 1, and trailing zeros stripped.
+        let prec = java_decimal_precision(&unscaled);
+        if prec > precision as usize {
+            return Err(Error::IllegalArgument {
+                message: format!(
+                    "Decimal precision overflow after rescale: scaled={} has 
{} digits but precision is {} (orig={})",
+                    scaled, prec, precision, value
+                ),
+            });
+        }

Review Comment:
   Should we encapsulate these within a utility class? Java side seems to use 
this logic in a few places.



##########
crates/fluss/src/row/compacted/compacted_row_writer.rs:
##########
@@ -89,64 +108,250 @@ impl BinaryWriter for CompactedRowWriter {
         self.buffer[byte_index] |= 1u8 << bit;
     }
 
-    fn write_boolean(&mut self, value: bool) {
+    fn write_boolean(&mut self, value: bool) -> Result<()> {
         let b = if value { 1u8 } else { 0u8 };
-        self.write_raw(&[b]);
+        self.write_raw(&[b])
     }
 
-    fn write_byte(&mut self, value: u8) {
-        self.write_raw(&[value]);
+    fn write_byte(&mut self, value: u8) -> Result<()> {
+        self.write_raw(&[value])
     }
 
-    fn write_bytes(&mut self, value: &[u8]) {
-        let len_i32 =
-            i32::try_from(value.len()).expect("byte slice too large to encode 
length as i32");
-        self.write_int(len_i32);
-        self.write_raw(value);
+    fn write_bytes(&mut self, value: &[u8]) -> Result<()> {
+        let len_i32 = i32::try_from(value.len()).map_err(|_| 
Error::IllegalArgument {
+            message: format!(
+                "Byte slice too large to encode length as i32: {} bytes 
exceeds i32::MAX",
+                value.len()
+            ),
+        })?;
+        self.write_int(len_i32)?;
+        self.write_raw(value)
     }
 
-    fn write_char(&mut self, value: &str, _length: usize) {
+    fn write_char(&mut self, value: &str, _length: usize) -> Result<()> {
         // TODO: currently, we encoding CHAR(length) as the same with STRING, 
the length info can be
         //  omitted and the bytes length should be enforced in the future.
-        self.write_string(value);
+        self.write_string(value)
     }
 
-    fn write_string(&mut self, value: &str) {
-        self.write_bytes(value.as_ref());
+    fn write_string(&mut self, value: &str) -> Result<()> {
+        self.write_bytes(value.as_ref())
     }
 
-    fn write_short(&mut self, value: i16) {
-        self.write_raw(&value.to_ne_bytes());
+    fn write_short(&mut self, value: i16) -> Result<()> {
+        // Use native endianness to match Java's UnsafeUtils.putShort behavior
+        // Java uses sun.misc.Unsafe which writes in native byte order 
(typically LE on x86/ARM)
+        self.write_raw(&value.to_ne_bytes())
     }
 
-    fn write_int(&mut self, value: i32) {
+    fn write_int(&mut self, value: i32) -> Result<()> {
         self.ensure_capacity(Self::MAX_INT_SIZE);
         let bytes_written =
             write_unsigned_varint_to_slice(value as u32, &mut 
self.buffer[self.position..]);
         self.position += bytes_written;
+        Ok(())
     }
 
-    fn write_long(&mut self, value: i64) {
+    fn write_long(&mut self, value: i64) -> Result<()> {
         self.ensure_capacity(Self::MAX_LONG_SIZE);
         let bytes_written =
             write_unsigned_varint_u64_to_slice(value as u64, &mut 
self.buffer[self.position..]);
         self.position += bytes_written;
+        Ok(())
     }
-    fn write_float(&mut self, value: f32) {
-        self.write_raw(&value.to_ne_bytes());
+
+    fn write_float(&mut self, value: f32) -> Result<()> {
+        // Use native endianness to match Java's UnsafeUtils.putFloat behavior
+        self.write_raw(&value.to_ne_bytes())
     }
 
-    fn write_double(&mut self, value: f64) {
-        self.write_raw(&value.to_ne_bytes());
+    fn write_double(&mut self, value: f64) -> Result<()> {
+        // Use native endianness to match Java's UnsafeUtils.putDouble behavior
+        self.write_raw(&value.to_ne_bytes())
     }
 
-    fn write_binary(&mut self, bytes: &[u8], length: usize) {
+    fn write_binary(&mut self, bytes: &[u8], length: usize) -> Result<()> {
         // TODO: currently, we encoding BINARY(length) as the same with BYTES, 
the length info can
         //  be omitted and the bytes length should be enforced in the future.
-        self.write_bytes(&bytes[..length.min(bytes.len())]);
+        self.write_bytes(&bytes[..length.min(bytes.len())])
     }
 
     fn complete(&mut self) {
         // do nothing
     }
+
+    fn write_decimal(
+        &mut self,
+        value: &bigdecimal::BigDecimal,
+        precision: u32,
+        scale: u32,
+    ) -> Result<()> {
+        const MAX_COMPACT_PRECISION: u32 = 18;

Review Comment:
   Let's define this in one place as a const and use it from everywhere else



##########
crates/fluss/src/row/encode/compacted_key_encoder.rs:
##########
@@ -304,6 +307,18 @@ mod tests {
         expected.extend(vec![0x33, 0x33, 0x53, 0x41]);
         // DOUBLE: 15.21
         expected.extend(vec![0xEC, 0x51, 0xB8, 0x1E, 0x85, 0x6B, 0x2E, 0x40]);
+        // DATE: 19651 (2023-10-25) - varint encoding
+        expected.extend(vec![0xC3, 0x99, 0x01]);
+        // TIME: 34200000 (09:30:00.0) - varint encoding
+        expected.extend(vec![0xC0, 0xB3, 0xA7, 0x10]);
+        // TIMESTAMP: 1698235273182 - non-compact encoding (precision=6 > 3)
+        // Format: millis (varint i64) + nanos (varint i32)
+        // Millis: 1698235273182 = [DE 9F D7 B5 B6 31], Nanos: 0 = [00]
+        expected.extend(vec![0xDE, 0x9F, 0xD7, 0xB5, 0xB6, 0x31, 0x00]);
+        // TIMESTAMP_LTZ: 1698235273182 - same non-compact encoding
+        expected.extend(vec![0xDE, 0x9F, 0xD7, 0xB5, 0xB6, 0x31, 0x00]);
+        // DECIMAL: 12345 (representing 123.45 with scale 2) - varint encoding
+        expected.extend(vec![0xB9, 0x60]);

Review Comment:
   Would appreciate if we can have these lines ordering and values are similar 
as Java side's test here: 
https://github.com/apache/fluss/pull/2312/changes#diff-9e112c84147b71ed60974c0bf8ec82568e85e4d2e3c38dfe60c9c576e19472e3
   
   
   ```
   # DECIMAL(52) Decimal.fromUnscaledLong(9, 5, 2)
   09
   
   # DECIMAL(200): Decimal.fromBigDecimal(new BigDecimal(10), 20, 0)
   01 0A
   
   # TIMESTAMP(1): TimestampNtz.fromMillis(1698235273182L)
   DE 9F D7 B5 B6 31
   
   # TIMESTAMP(5): TimestampNtz.fromMillis(1698235273182L))
   DE 9F D7 B5 B6 31 00
   
   # TIMESTAMP_LTZ(1): TimestampLtz.fromEpochMillis(1698235273182L)
   DE 9F D7 B5 B6 31
   
   # TIMESTAMP_LTZ(5): TimestampLtz.fromEpochMillis(1698235273182L)
   DE 9F D7 B5 B6 31 00
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to