This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 350d5a9  feat: support all basic datatypes in compacted key 
encoder(continuation) (#175)
350d5a9 is described below

commit 350d5a918ff1e7c41d092fcaa6c4a7651f31602b
Author: Anton Borisov <[email protected]>
AuthorDate: Mon Jan 19 13:27:22 2026 +0000

    feat: support all basic datatypes in compacted key encoder(continuation) 
(#175)
---
 crates/fluss/Cargo.toml                            |   2 +-
 crates/fluss/src/metadata/datatype.rs              | 272 ++++++++----
 crates/fluss/src/metadata/json_serde.rs            | 112 ++++-
 crates/fluss/src/record/arrow.rs                   |  21 +-
 crates/fluss/src/row/binary/binary_writer.rs       |  57 ++-
 crates/fluss/src/row/column.rs                     | 198 ++++++++-
 .../src/row/compacted/compacted_key_writer.rs      |   7 +
 crates/fluss/src/row/compacted/compacted_row.rs    | 147 +++++--
 .../src/row/compacted/compacted_row_reader.rs      |  72 +++-
 .../src/row/compacted/compacted_row_writer.rs      | 125 +++++-
 crates/fluss/src/row/datum.rs                      | 181 +++++++-
 crates/fluss/src/row/decimal.rs                    | 477 +++++++++++++++++++++
 .../fluss/src/row/encode/compacted_key_encoder.rs  | 175 +++++---
 crates/fluss/src/row/field_getter.rs               | 101 ++++-
 crates/fluss/src/row/mod.rs                        |  63 ++-
 15 files changed, 1739 insertions(+), 271 deletions(-)

diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml
index 8942ffc..c3bdd44 100644
--- a/crates/fluss/Cargo.toml
+++ b/crates/fluss/Cargo.toml
@@ -48,7 +48,7 @@ tokio = { workspace = true }
 parking_lot = "0.12"
 bytes = "1.10.1"
 dashmap = "6.1.0"
-rust_decimal = "1"
+bigdecimal = { version = "0.4", features = ["serde"] }
 ordered-float = { version = "5", features = ["serde"] }
 parse-display = "0.10"
 ref-cast = "1.0"
diff --git a/crates/fluss/src/metadata/datatype.rs 
b/crates/fluss/src/metadata/datatype.rs
index f157466..e365237 100644
--- a/crates/fluss/src/metadata/datatype.rs
+++ b/crates/fluss/src/metadata/datatype.rs
@@ -453,16 +453,40 @@ impl DecimalType {
 
     pub const DEFAULT_SCALE: u32 = 0;
 
-    pub fn new(precision: u32, scale: u32) -> Self {
+    pub fn new(precision: u32, scale: u32) -> Result<Self> {
         Self::with_nullable(true, precision, scale)
     }
 
-    pub fn with_nullable(nullable: bool, precision: u32, scale: u32) -> Self {
-        DecimalType {
+    /// Create a DecimalType with validation, returning an error if parameters 
are invalid.
+    pub fn with_nullable(nullable: bool, precision: u32, scale: u32) -> 
Result<Self> {
+        // Validate precision
+        if !(Self::MIN_PRECISION..=Self::MAX_PRECISION).contains(&precision) {
+            return Err(IllegalArgument {
+                message: format!(
+                    "Decimal precision must be between {} and {} (both 
inclusive), got: {}",
+                    Self::MIN_PRECISION,
+                    Self::MAX_PRECISION,
+                    precision
+                ),
+            });
+        }
+        // Validate scale
+        // Note: MIN_SCALE is 0, and scale is u32, so scale >= MIN_SCALE is 
always true
+        if scale > precision {
+            return Err(IllegalArgument {
+                message: format!(
+                    "Decimal scale must be between {} and the precision {} 
(both inclusive), got: {}",
+                    Self::MIN_SCALE,
+                    precision,
+                    scale
+                ),
+            });
+        }
+        Ok(DecimalType {
             nullable,
             precision,
             scale,
-        }
+        })
     }
 
     pub fn precision(&self) -> u32 {
@@ -475,6 +499,7 @@ impl DecimalType {
 
     pub fn as_non_nullable(&self) -> Self {
         Self::with_nullable(false, self.precision, self.scale)
+            .expect("Invalid decimal precision or scale")
     }
 }
 
@@ -531,7 +556,7 @@ pub struct TimeType {
 
 impl TimeType {
     fn default() -> Self {
-        Self::new(Self::DEFAULT_PRECISION)
+        Self::new(Self::DEFAULT_PRECISION).expect("Invalid default time 
precision")
     }
 }
 
@@ -542,15 +567,27 @@ impl TimeType {
 
     pub const DEFAULT_PRECISION: u32 = 0;
 
-    pub fn new(precision: u32) -> Self {
+    pub fn new(precision: u32) -> Result<Self> {
         Self::with_nullable(true, precision)
     }
 
-    pub fn with_nullable(nullable: bool, precision: u32) -> Self {
-        TimeType {
+    /// Create a TimeType with validation, returning an error if precision is 
invalid.
+    pub fn with_nullable(nullable: bool, precision: u32) -> Result<Self> {
+        // Validate precision
+        if !(Self::MIN_PRECISION..=Self::MAX_PRECISION).contains(&precision) {
+            return Err(IllegalArgument {
+                message: format!(
+                    "Time precision must be between {} and {} (both 
inclusive), got: {}",
+                    Self::MIN_PRECISION,
+                    Self::MAX_PRECISION,
+                    precision
+                ),
+            });
+        }
+        Ok(TimeType {
             nullable,
             precision,
-        }
+        })
     }
 
     pub fn precision(&self) -> u32 {
@@ -558,7 +595,7 @@ impl TimeType {
     }
 
     pub fn as_non_nullable(&self) -> Self {
-        Self::with_nullable(false, self.precision)
+        Self::with_nullable(false, self.precision).expect("Invalid time 
precision")
     }
 }
 
@@ -580,7 +617,7 @@ pub struct TimestampType {
 
 impl Default for TimestampType {
     fn default() -> Self {
-        Self::new(Self::DEFAULT_PRECISION)
+        Self::new(Self::DEFAULT_PRECISION).expect("Invalid default timestamp 
precision")
     }
 }
 
@@ -591,15 +628,27 @@ impl TimestampType {
 
     pub const DEFAULT_PRECISION: u32 = 6;
 
-    pub fn new(precision: u32) -> Self {
+    pub fn new(precision: u32) -> Result<Self> {
         Self::with_nullable(true, precision)
     }
 
-    pub fn with_nullable(nullable: bool, precision: u32) -> Self {
-        TimestampType {
+    /// Create a TimestampType with validation, returning an error if 
precision is invalid.
+    pub fn with_nullable(nullable: bool, precision: u32) -> Result<Self> {
+        // Validate precision
+        if !(Self::MIN_PRECISION..=Self::MAX_PRECISION).contains(&precision) {
+            return Err(IllegalArgument {
+                message: format!(
+                    "Timestamp precision must be between {} and {} (both 
inclusive), got: {}",
+                    Self::MIN_PRECISION,
+                    Self::MAX_PRECISION,
+                    precision
+                ),
+            });
+        }
+        Ok(TimestampType {
             nullable,
             precision,
-        }
+        })
     }
 
     pub fn precision(&self) -> u32 {
@@ -607,7 +656,7 @@ impl TimestampType {
     }
 
     pub fn as_non_nullable(&self) -> Self {
-        Self::with_nullable(false, self.precision)
+        Self::with_nullable(false, self.precision).expect("Invalid timestamp 
precision")
     }
 }
 
@@ -630,6 +679,7 @@ pub struct TimestampLTzType {
 impl Default for TimestampLTzType {
     fn default() -> Self {
         Self::new(Self::DEFAULT_PRECISION)
+            .expect("Invalid default timestamp with local time zone precision")
     }
 }
 
@@ -640,15 +690,27 @@ impl TimestampLTzType {
 
     pub const DEFAULT_PRECISION: u32 = 6;
 
-    pub fn new(precision: u32) -> Self {
+    pub fn new(precision: u32) -> Result<Self> {
         Self::with_nullable(true, precision)
     }
 
-    pub fn with_nullable(nullable: bool, precision: u32) -> Self {
-        TimestampLTzType {
+    /// Create a TimestampLTzType with validation, returning an error if 
precision is invalid.
+    pub fn with_nullable(nullable: bool, precision: u32) -> Result<Self> {
+        // Validate precision
+        if !(Self::MIN_PRECISION..=Self::MAX_PRECISION).contains(&precision) {
+            return Err(IllegalArgument {
+                message: format!(
+                    "Timestamp with local time zone precision must be between 
{} and {} (both inclusive), got: {}",
+                    Self::MIN_PRECISION,
+                    Self::MAX_PRECISION,
+                    precision
+                ),
+            });
+        }
+        Ok(TimestampLTzType {
             nullable,
             precision,
-        }
+        })
     }
 
     pub fn precision(&self) -> u32 {
@@ -657,6 +719,7 @@ impl TimestampLTzType {
 
     pub fn as_non_nullable(&self) -> Self {
         Self::with_nullable(false, self.precision)
+            .expect("Invalid timestamp with local time zone precision")
     }
 }
 
@@ -985,7 +1048,7 @@ impl DataTypes {
     /// digits to the right of the decimal point in a number (=scale). `p` 
must have a value
     /// between 1 and 38 (both inclusive). `s` must have a value between 0 and 
`p` (both inclusive).
     pub fn decimal(precision: u32, scale: u32) -> DataType {
-        DataType::Decimal(DecimalType::new(precision, scale))
+        DataType::Decimal(DecimalType::new(precision, scale).expect("Invalid 
decimal parameters"))
     }
 
     pub fn date() -> DataType {
@@ -1000,7 +1063,7 @@ impl DataTypes {
     /// Data type of a time WITHOUT time zone `TIME(p)` where `p` is the 
number of digits
     /// of fractional seconds (=precision). `p` must have a value between 0 
and 9 (both inclusive).
     pub fn time_with_precision(precision: u32) -> DataType {
-        DataType::Time(TimeType::new(precision))
+        DataType::Time(TimeType::new(precision).expect("Invalid time 
precision"))
     }
 
     /// Data type of a timestamp WITHOUT time zone `TIMESTAMP` with 6 digits 
of fractional
@@ -1013,7 +1076,7 @@ impl DataTypes {
     /// of digits of fractional seconds (=precision). `p` must have a value 
between 0 and 9
     /// (both inclusive).
     pub fn timestamp_with_precision(precision: u32) -> DataType {
-        DataType::Timestamp(TimestampType::new(precision))
+        DataType::Timestamp(TimestampType::new(precision).expect("Invalid 
timestamp precision"))
     }
 
     /// Data type of a timestamp WITH time zone `TIMESTAMP WITH TIME ZONE` 
with 6 digits of
@@ -1025,7 +1088,10 @@ impl DataTypes {
     /// Data type of a timestamp WITH time zone `TIMESTAMP WITH TIME ZONE(p)` 
where `p` is the number
     /// of digits of fractional seconds (=precision). `p` must have a value 
between 0 and 9 (both inclusive).
     pub fn timestamp_ltz_with_precision(precision: u32) -> DataType {
-        DataType::TimestampLTz(TimestampLTzType::new(precision))
+        DataType::TimestampLTz(
+            TimestampLTzType::new(precision)
+                .expect("Invalid timestamp with local time zone precision"),
+        )
     }
 
     /// Data type of an array of elements with same subtype.
@@ -1100,82 +1166,56 @@ impl Display for DataField {
 }
 
 #[test]
-fn test_boolean_display() {
+fn test_primitive_types_display() {
+    // Test simple primitive types with nullable and non-nullable variants
     assert_eq!(BooleanType::new().to_string(), "BOOLEAN");
     assert_eq!(
         BooleanType::with_nullable(false).to_string(),
         "BOOLEAN NOT NULL"
     );
-}
 
-#[test]
-fn test_tinyint_display() {
     assert_eq!(TinyIntType::new().to_string(), "TINYINT");
     assert_eq!(
         TinyIntType::with_nullable(false).to_string(),
         "TINYINT NOT NULL"
     );
-}
 
-#[test]
-fn test_smallint_display() {
     assert_eq!(SmallIntType::new().to_string(), "SMALLINT");
     assert_eq!(
         SmallIntType::with_nullable(false).to_string(),
         "SMALLINT NOT NULL"
     );
-}
 
-#[test]
-fn test_int_display() {
     assert_eq!(IntType::new().to_string(), "INT");
     assert_eq!(IntType::with_nullable(false).to_string(), "INT NOT NULL");
-}
 
-#[test]
-fn test_bigint_display() {
     assert_eq!(BigIntType::new().to_string(), "BIGINT");
     assert_eq!(
         BigIntType::with_nullable(false).to_string(),
         "BIGINT NOT NULL"
     );
-}
 
-#[test]
-fn test_float_display() {
     assert_eq!(FloatType::new().to_string(), "FLOAT");
     assert_eq!(
         FloatType::with_nullable(false).to_string(),
         "FLOAT NOT NULL"
     );
-}
 
-#[test]
-fn test_double_display() {
     assert_eq!(DoubleType::new().to_string(), "DOUBLE");
     assert_eq!(
         DoubleType::with_nullable(false).to_string(),
         "DOUBLE NOT NULL"
     );
-}
 
-#[test]
-fn test_string_display() {
     assert_eq!(StringType::new().to_string(), "STRING");
     assert_eq!(
         StringType::with_nullable(false).to_string(),
         "STRING NOT NULL"
     );
-}
 
-#[test]
-fn test_date_display() {
     assert_eq!(DateType::new().to_string(), "DATE");
     assert_eq!(DateType::with_nullable(false).to_string(), "DATE NOT NULL");
-}
 
-#[test]
-fn test_bytes_display() {
     assert_eq!(BytesType::new().to_string(), "BYTES");
     assert_eq!(
         BytesType::with_nullable(false).to_string(),
@@ -1184,59 +1224,58 @@ fn test_bytes_display() {
 }
 
 #[test]
-fn test_char_display() {
+fn test_parameterized_types_display() {
+    // Test types with parameters (length, precision, scale, etc.)
     assert_eq!(CharType::new(10).to_string(), "CHAR(10)");
     assert_eq!(
         CharType::with_nullable(20, false).to_string(),
         "CHAR(20) NOT NULL"
     );
-}
 
-#[test]
-fn test_decimal_display() {
-    assert_eq!(DecimalType::new(10, 2).to_string(), "DECIMAL(10, 2)");
+    assert_eq!(BinaryType::new(100).to_string(), "BINARY(100)");
+    assert_eq!(
+        BinaryType::with_nullable(false, 256).to_string(),
+        "BINARY(256) NOT NULL"
+    );
+
     assert_eq!(
-        DecimalType::with_nullable(false, 38, 10).to_string(),
+        DecimalType::new(10, 2).unwrap().to_string(),
+        "DECIMAL(10, 2)"
+    );
+    assert_eq!(
+        DecimalType::with_nullable(false, 38, 10)
+            .unwrap()
+            .to_string(),
         "DECIMAL(38, 10) NOT NULL"
     );
-}
 
-#[test]
-fn test_time_display() {
-    assert_eq!(TimeType::new(0).to_string(), "TIME(0)");
-    assert_eq!(TimeType::new(3).to_string(), "TIME(3)");
+    assert_eq!(TimeType::new(0).unwrap().to_string(), "TIME(0)");
+    assert_eq!(TimeType::new(3).unwrap().to_string(), "TIME(3)");
     assert_eq!(
-        TimeType::with_nullable(false, 9).to_string(),
+        TimeType::with_nullable(false, 9).unwrap().to_string(),
         "TIME(9) NOT NULL"
     );
-}
 
-#[test]
-fn test_timestamp_display() {
-    assert_eq!(TimestampType::new(6).to_string(), "TIMESTAMP(6)");
-    assert_eq!(TimestampType::new(0).to_string(), "TIMESTAMP(0)");
+    assert_eq!(TimestampType::new(6).unwrap().to_string(), "TIMESTAMP(6)");
+    assert_eq!(TimestampType::new(0).unwrap().to_string(), "TIMESTAMP(0)");
     assert_eq!(
-        TimestampType::with_nullable(false, 9).to_string(),
+        TimestampType::with_nullable(false, 9).unwrap().to_string(),
         "TIMESTAMP(9) NOT NULL"
     );
-}
 
-#[test]
-fn test_timestamp_ltz_display() {
-    assert_eq!(TimestampLTzType::new(6).to_string(), "TIMESTAMP_LTZ(6)");
-    assert_eq!(TimestampLTzType::new(3).to_string(), "TIMESTAMP_LTZ(3)");
     assert_eq!(
-        TimestampLTzType::with_nullable(false, 9).to_string(),
-        "TIMESTAMP_LTZ(9) NOT NULL"
+        TimestampLTzType::new(6).unwrap().to_string(),
+        "TIMESTAMP_LTZ(6)"
     );
-}
-
-#[test]
-fn test_binary_display() {
-    assert_eq!(BinaryType::new(100).to_string(), "BINARY(100)");
     assert_eq!(
-        BinaryType::with_nullable(false, 256).to_string(),
-        "BINARY(256) NOT NULL"
+        TimestampLTzType::new(3).unwrap().to_string(),
+        "TIMESTAMP_LTZ(3)"
+    );
+    assert_eq!(
+        TimestampLTzType::with_nullable(false, 9)
+            .unwrap()
+            .to_string(),
+        "TIMESTAMP_LTZ(9) NOT NULL"
     );
 }
 
@@ -1352,3 +1391,68 @@ fn test_deeply_nested_types() {
     ));
     assert_eq!(nested.to_string(), "ARRAY<MAP<STRING, ROW<x INT, y INT>>>");
 }
+
+#[test]
+fn test_decimal_invalid_precision() {
+    // DecimalType::with_nullable should return an error for invalid precision
+    let result = DecimalType::with_nullable(true, 50, 2);
+    assert!(result.is_err());
+    assert!(
+        result
+            .unwrap_err()
+            .to_string()
+            .contains("Decimal precision must be between 1 and 38")
+    );
+}
+
+#[test]
+fn test_decimal_invalid_scale() {
+    // DecimalType::with_nullable should return an error when scale > precision
+    let result = DecimalType::with_nullable(true, 10, 15);
+    assert!(result.is_err());
+    assert!(
+        result
+            .unwrap_err()
+            .to_string()
+            .contains("Decimal scale must be between 0 and the precision 10")
+    );
+}
+
+#[test]
+fn test_time_invalid_precision() {
+    // TimeType::with_nullable should return an error for invalid precision
+    let result = TimeType::with_nullable(true, 10);
+    assert!(result.is_err());
+    assert!(
+        result
+            .unwrap_err()
+            .to_string()
+            .contains("Time precision must be between 0 and 9")
+    );
+}
+
+#[test]
+fn test_timestamp_invalid_precision() {
+    // TimestampType::with_nullable should return an error for invalid 
precision
+    let result = TimestampType::with_nullable(true, 10);
+    assert!(result.is_err());
+    assert!(
+        result
+            .unwrap_err()
+            .to_string()
+            .contains("Timestamp precision must be between 0 and 9")
+    );
+}
+
+#[test]
+fn test_timestamp_ltz_invalid_precision() {
+    // TimestampLTzType::with_nullable should return an error for invalid 
precision
+    let result = TimestampLTzType::with_nullable(true, 10);
+    assert!(result.is_err());
+    assert!(
+        result
+            .unwrap_err()
+            .to_string()
+            .contains("Timestamp with local time zone precision must be 
between 0 and 9")
+    );
+}
diff --git a/crates/fluss/src/metadata/json_serde.rs 
b/crates/fluss/src/metadata/json_serde.rs
index 7d94e19..faa5583 100644
--- a/crates/fluss/src/metadata/json_serde.rs
+++ b/crates/fluss/src/metadata/json_serde.rs
@@ -202,7 +202,12 @@ impl JsonSerde for DataType {
                     .get(Self::FIELD_NAME_SCALE)
                     .and_then(|v| v.as_u64())
                     .unwrap_or(0) as u32;
-                DataTypes::decimal(precision, scale)
+                DataType::Decimal(
+                    
crate::metadata::datatype::DecimalType::with_nullable(true, precision, scale)
+                        .map_err(|e| Error::JsonSerdeError {
+                        message: format!("Invalid DECIMAL parameters: {}", e),
+                    })?,
+                )
             }
             "DATE" => DataTypes::date(),
             "TIME_WITHOUT_TIME_ZONE" => {
@@ -210,21 +215,43 @@ impl JsonSerde for DataType {
                     .get(Self::FIELD_NAME_PRECISION)
                     .and_then(|v| v.as_u64())
                     .unwrap_or(0) as u32;
-                DataTypes::time_with_precision(precision)
+                DataType::Time(
+                    crate::metadata::datatype::TimeType::with_nullable(true, 
precision).map_err(
+                        |e| Error::JsonSerdeError {
+                            message: format!("Invalid TIME_WITHOUT_TIME_ZONE 
precision: {}", e),
+                        },
+                    )?,
+                )
             }
             "TIMESTAMP_WITHOUT_TIME_ZONE" => {
                 let precision = node
                     .get(Self::FIELD_NAME_PRECISION)
                     .and_then(|v| v.as_u64())
                     .unwrap_or(6) as u32;
-                DataTypes::timestamp_with_precision(precision)
+                DataType::Timestamp(
+                    
crate::metadata::datatype::TimestampType::with_nullable(true, precision)
+                        .map_err(|e| Error::JsonSerdeError {
+                            message: format!(
+                                "Invalid TIMESTAMP_WITHOUT_TIME_ZONE 
precision: {}",
+                                e
+                            ),
+                        })?,
+                )
             }
             "TIMESTAMP_WITH_LOCAL_TIME_ZONE" => {
                 let precision = node
                     .get(Self::FIELD_NAME_PRECISION)
                     .and_then(|v| v.as_u64())
                     .unwrap_or(6) as u32;
-                DataTypes::timestamp_ltz_with_precision(precision)
+                DataType::TimestampLTz(
+                    
crate::metadata::datatype::TimestampLTzType::with_nullable(true, precision)
+                        .map_err(|e| Error::JsonSerdeError {
+                            message: format!(
+                                "Invalid TIMESTAMP_WITH_LOCAL_TIME_ZONE 
precision: {}",
+                                e
+                            ),
+                        })?,
+                )
             }
             "BYTES" => DataTypes::bytes(),
             "BINARY" => {
@@ -689,4 +716,81 @@ mod tests {
             assert_eq!(dt, deserialized);
         }
     }
+
+    #[test]
+    fn test_invalid_datatype_validation() {
+        use serde_json::json;
+
+        // Invalid DECIMAL precision (> 38)
+        let invalid_decimal = json!({
+            "type": "DECIMAL",
+            "precision": 50,
+            "scale": 2
+        });
+        let result = DataType::deserialize_json(&invalid_decimal);
+        assert!(result.is_err());
+        assert!(
+            result
+                .unwrap_err()
+                .to_string()
+                .contains("Invalid DECIMAL parameters")
+        );
+
+        // Invalid TIME precision (> 9)
+        let invalid_time = json!({
+            "type": "TIME_WITHOUT_TIME_ZONE",
+            "precision": 15
+        });
+        let result = DataType::deserialize_json(&invalid_time);
+        assert!(result.is_err());
+        assert!(
+            result
+                .unwrap_err()
+                .to_string()
+                .contains("Invalid TIME_WITHOUT_TIME_ZONE precision")
+        );
+
+        // Invalid TIMESTAMP precision (> 9)
+        let invalid_timestamp = json!({
+            "type": "TIMESTAMP_WITHOUT_TIME_ZONE",
+            "precision": 20
+        });
+        let result = DataType::deserialize_json(&invalid_timestamp);
+        assert!(result.is_err());
+        assert!(
+            result
+                .unwrap_err()
+                .to_string()
+                .contains("Invalid TIMESTAMP_WITHOUT_TIME_ZONE precision")
+        );
+
+        // Invalid TIMESTAMP_LTZ precision (> 9)
+        let invalid_timestamp_ltz = json!({
+            "type": "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+            "precision": 10
+        });
+        let result = DataType::deserialize_json(&invalid_timestamp_ltz);
+        assert!(result.is_err());
+        assert!(
+            result
+                .unwrap_err()
+                .to_string()
+                .contains("Invalid TIMESTAMP_WITH_LOCAL_TIME_ZONE precision")
+        );
+
+        // Invalid DECIMAL scale (> precision)
+        let invalid_decimal_scale = json!({
+            "type": "DECIMAL",
+            "precision": 10,
+            "scale": 15
+        });
+        let result = DataType::deserialize_json(&invalid_decimal_scale);
+        assert!(result.is_err());
+        assert!(
+            result
+                .unwrap_err()
+                .to_string()
+                .contains("Invalid DECIMAL parameters")
+        );
+    }
 }
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index aa48376..3c46f9b 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -1061,8 +1061,7 @@ pub struct MyVec<T>(pub StreamReader<T>);
 mod tests {
     use super::*;
     use crate::error::Error;
-    use crate::metadata::DataField;
-    use crate::metadata::DataTypes;
+    use crate::metadata::{DataField, DataTypes};
 
     #[test]
     fn test_to_array_type() {
@@ -1166,24 +1165,6 @@ mod tests {
         );
     }
 
-    #[test]
-    #[should_panic(expected = "Invalid precision value for TimeType: 10")]
-    fn test_time_invalid_precision() {
-        to_arrow_type(&DataTypes::time_with_precision(10));
-    }
-
-    #[test]
-    #[should_panic(expected = "Invalid precision value for TimestampType: 10")]
-    fn test_timestamp_invalid_precision() {
-        to_arrow_type(&DataTypes::timestamp_with_precision(10));
-    }
-
-    #[test]
-    #[should_panic(expected = "Invalid precision value for TimestampLTzType: 
10")]
-    fn test_timestamp_ltz_invalid_precision() {
-        to_arrow_type(&DataTypes::timestamp_ltz_with_precision(10));
-    }
-
     #[test]
     fn test_parse_ipc_message() {
         let empty_body: &[u8] = &le_bytes(&[0xFFFFFFFF, 0x00000000]);
diff --git a/crates/fluss/src/row/binary/binary_writer.rs 
b/crates/fluss/src/row/binary/binary_writer.rs
index 9917c7b..af2765c 100644
--- a/crates/fluss/src/row/binary/binary_writer.rs
+++ b/crates/fluss/src/row/binary/binary_writer.rs
@@ -52,14 +52,20 @@ pub trait BinaryWriter {
 
     fn write_binary(&mut self, bytes: &[u8], length: usize);
 
-    // TODO Decimal type
-    // fn write_decimal(&mut self, pos: i32, value: f64);
+    fn write_decimal(&mut self, value: &crate::row::Decimal, precision: u32);
 
-    // TODO Timestamp type
-    // fn write_timestamp_ntz(&mut self, pos: i32, value: i64);
+    /// Writes a TIME value.
+    ///
+    /// Note: TIME is physically stored as an i32 (milliseconds since 
midnight).
+    /// This method exists for type safety and semantic clarity, even though 
it's
+    /// currently equivalent to `write_int()`. The precision parameter is 
accepted
+    /// for API consistency with TIMESTAMP types, though TIME encoding doesn't
+    /// currently vary by precision.
+    fn write_time(&mut self, value: i32, precision: u32);
 
-    // TODO Timestamp type
-    // fn write_timestamp_ltz(&mut self, pos: i32, value: i64);
+    fn write_timestamp_ntz(&mut self, value: &crate::row::datum::TimestampNtz, 
precision: u32);
+
+    fn write_timestamp_ltz(&mut self, value: &crate::row::datum::TimestampLtz, 
precision: u32);
 
     // TODO InternalArray, ArraySerializer
     // fn write_array(&mut self, pos: i32, value: i64);
@@ -125,7 +131,12 @@ pub enum InnerValueWriter {
     BigInt,
     Float,
     Double,
-    // TODO Decimal, Date, TimeWithoutTimeZone, TimestampWithoutTimeZone, 
TimestampWithLocalTimeZone, Array, Row
+    Decimal(u32, u32), // precision, scale
+    Date,
+    Time(u32),         // precision (not used in wire format, but kept for 
consistency)
+    TimestampNtz(u32), // precision
+    TimestampLtz(u32), // precision
+                       // TODO Array, Row
 }
 
 /// Accessor for writing the fields/elements of a binary writer during 
runtime, the
@@ -147,6 +158,23 @@ impl InnerValueWriter {
             DataType::BigInt(_) => Ok(InnerValueWriter::BigInt),
             DataType::Float(_) => Ok(InnerValueWriter::Float),
             DataType::Double(_) => Ok(InnerValueWriter::Double),
+            DataType::Decimal(d) => {
+                // Validation is done at DecimalType construction time
+                Ok(InnerValueWriter::Decimal(d.precision(), d.scale()))
+            }
+            DataType::Date(_) => Ok(InnerValueWriter::Date),
+            DataType::Time(t) => {
+                // Validation is done at TimeType construction time
+                Ok(InnerValueWriter::Time(t.precision()))
+            }
+            DataType::Timestamp(t) => {
+                // Validation is done at TimestampType construction time
+                Ok(InnerValueWriter::TimestampNtz(t.precision()))
+            }
+            DataType::TimestampLTz(t) => {
+                // Validation is done at TimestampLTzType construction time
+                Ok(InnerValueWriter::TimestampLtz(t.precision()))
+            }
             _ => unimplemented!(
                 "ValueWriter for DataType {:?} is currently not implemented",
                 data_type
@@ -194,6 +222,21 @@ impl InnerValueWriter {
             (InnerValueWriter::Double, Datum::Float64(v)) => {
                 writer.write_double(v.into_inner());
             }
+            (InnerValueWriter::Decimal(p, _s), Datum::Decimal(v)) => {
+                writer.write_decimal(v, *p);
+            }
+            (InnerValueWriter::Date, Datum::Date(d)) => {
+                writer.write_int(d.get_inner());
+            }
+            (InnerValueWriter::Time(p), Datum::Time(t)) => {
+                writer.write_time(t.get_inner(), *p);
+            }
+            (InnerValueWriter::TimestampNtz(p), Datum::TimestampNtz(ts)) => {
+                writer.write_timestamp_ntz(ts, *p);
+            }
+            (InnerValueWriter::TimestampLtz(p), Datum::TimestampLtz(ts)) => {
+                writer.write_timestamp_ltz(ts, *p);
+            }
             _ => {
                 return Err(IllegalArgument {
                     message: format!("{self:?} used to write value {value:?}"),
diff --git a/crates/fluss/src/row/column.rs b/crates/fluss/src/row/column.rs
index 90437c1..615e038 100644
--- a/crates/fluss/src/row/column.rs
+++ b/crates/fluss/src/row/column.rs
@@ -17,9 +17,10 @@
 
 use crate::row::InternalRow;
 use arrow::array::{
-    AsArray, BinaryArray, FixedSizeBinaryArray, Float32Array, Float64Array, 
Int8Array, Int16Array,
-    Int32Array, Int64Array, RecordBatch, StringArray,
+    Array, AsArray, BinaryArray, Decimal128Array, FixedSizeBinaryArray, 
Float32Array, Float64Array,
+    Int8Array, Int16Array, Int32Array, Int64Array, RecordBatch, StringArray,
 };
+use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
 use std::sync::Arc;
 
 #[derive(Clone)]
@@ -54,6 +55,49 @@ impl ColumnarRow {
     pub fn get_record_batch(&self) -> &RecordBatch {
         &self.record_batch
     }
+
+    /// Generic helper to read timestamp from Arrow, handling all TimeUnit 
conversions.
+    /// Like Java, the precision parameter is ignored - conversion is 
determined by Arrow TimeUnit.
+    fn read_timestamp_from_arrow<T>(
+        &self,
+        pos: usize,
+        _precision: u32,
+        construct_compact: impl FnOnce(i64) -> T,
+        construct_with_nanos: impl FnOnce(i64, i32) -> crate::error::Result<T>,
+    ) -> T {
+        let schema = self.record_batch.schema();
+        let arrow_field = schema.field(pos);
+        let value = self.get_long(pos);
+
+        match arrow_field.data_type() {
+            ArrowDataType::Timestamp(time_unit, _) => {
+                // Convert based on Arrow TimeUnit
+                let (millis, nanos) = match time_unit {
+                    TimeUnit::Second => (value * 1000, 0),
+                    TimeUnit::Millisecond => (value, 0),
+                    TimeUnit::Microsecond => {
+                        let millis = value / 1000;
+                        let nanos = ((value % 1000) * 1000) as i32;
+                        (millis, nanos)
+                    }
+                    TimeUnit::Nanosecond => {
+                        let millis = value / 1_000_000;
+                        let nanos = (value % 1_000_000) as i32;
+                        (millis, nanos)
+                    }
+                };
+
+                if nanos == 0 {
+                    construct_compact(millis)
+                } else {
+                    // nanos is guaranteed to be in valid range [0, 999_999] 
by arithmetic
+                    construct_with_nanos(millis, nanos)
+                        .expect("nanos in valid range by construction")
+                }
+            }
+            other => panic!("Expected Timestamp column at position {pos}, got 
{other:?}"),
+        }
+    }
 }
 
 impl InternalRow for ColumnarRow {
@@ -126,6 +170,88 @@ impl InternalRow for ColumnarRow {
             .value(self.row_id)
     }
 
+    fn get_decimal(&self, pos: usize, precision: usize, scale: usize) -> 
crate::row::Decimal {
+        use arrow::datatypes::DataType;
+
+        let column = self.record_batch.column(pos);
+        let array = column
+            .as_any()
+            .downcast_ref::<Decimal128Array>()
+            .unwrap_or_else(|| {
+                panic!(
+                    "Expected Decimal128Array at column {}, found: {:?}",
+                    pos,
+                    column.data_type()
+                )
+            });
+
+        // Contract: caller must check is_null_at() before calling get_decimal.
+        // Calling on null value violates the contract and returns garbage data
+        debug_assert!(
+            !array.is_null(self.row_id),
+            "get_decimal called on null value at pos {} row {}",
+            pos,
+            self.row_id
+        );
+
+        // Read scale from Arrow schema field metadata
+        let schema = self.record_batch.schema();
+        let field = schema.field(pos);
+        let arrow_scale = match field.data_type() {
+            DataType::Decimal128(_p, s) => *s as i64,
+            dt => panic!(
+                "Expected Decimal128 data type at column {}, found: {:?}",
+                pos, dt
+            ),
+        };
+
+        let i128_val = array.value(self.row_id);
+
+        // Convert Arrow Decimal128 to Fluss Decimal (handles rescaling and 
validation)
+        crate::row::Decimal::from_arrow_decimal128(
+            i128_val,
+            arrow_scale,
+            precision as u32,
+            scale as u32,
+        )
+        .unwrap_or_else(|e| {
+            panic!(
+                "Failed to create Decimal at column {} row {}: {}",
+                pos, self.row_id, e
+            )
+        })
+    }
+
+    fn get_date(&self, pos: usize) -> crate::row::datum::Date {
+        crate::row::datum::Date::new(self.get_int(pos))
+    }
+
+    fn get_time(&self, pos: usize) -> crate::row::datum::Time {
+        crate::row::datum::Time::new(self.get_int(pos))
+    }
+
+    fn get_timestamp_ntz(&self, pos: usize, precision: u32) -> 
crate::row::datum::TimestampNtz {
+        // Like Java's ArrowTimestampNtzColumnVector, we ignore the precision 
parameter
+        // and determine the conversion from the Arrow column's TimeUnit.
+        self.read_timestamp_from_arrow(
+            pos,
+            precision,
+            crate::row::datum::TimestampNtz::new,
+            crate::row::datum::TimestampNtz::from_millis_nanos,
+        )
+    }
+
+    fn get_timestamp_ltz(&self, pos: usize, precision: u32) -> 
crate::row::datum::TimestampLtz {
+        // Like Java's ArrowTimestampLtzColumnVector, we ignore the precision 
parameter
+        // and determine the conversion from the Arrow column's TimeUnit.
+        self.read_timestamp_from_arrow(
+            pos,
+            precision,
+            crate::row::datum::TimestampLtz::new,
+            crate::row::datum::TimestampLtz::from_millis_nanos,
+        )
+    }
+
     fn get_char(&self, pos: usize, _length: usize) -> &str {
         let array = self
             .record_batch
@@ -229,4 +355,72 @@ mod tests {
         row.set_row_id(0);
         assert_eq!(row.get_row_id(), 0);
     }
+
+    #[test]
+    fn columnar_row_reads_decimal() {
+        use arrow::datatypes::DataType;
+        use bigdecimal::{BigDecimal, num_bigint::BigInt};
+
+        // Test with Decimal128
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("dec1", DataType::Decimal128(10, 2), false),
+            Field::new("dec2", DataType::Decimal128(20, 5), false),
+            Field::new("dec3", DataType::Decimal128(38, 10), false),
+        ]));
+
+        // Create decimal values: 123.45, 12345.67890, large decimal
+        let dec1_val = 12345i128; // 123.45 with scale 2
+        let dec2_val = 1234567890i128; // 12345.67890 with scale 5
+        let dec3_val = 999999999999999999i128; // Large value (18 nines) with 
scale 10
+
+        let batch = RecordBatch::try_new(
+            schema,
+            vec![
+                Arc::new(
+                    Decimal128Array::from(vec![dec1_val])
+                        .with_precision_and_scale(10, 2)
+                        .unwrap(),
+                ),
+                Arc::new(
+                    Decimal128Array::from(vec![dec2_val])
+                        .with_precision_and_scale(20, 5)
+                        .unwrap(),
+                ),
+                Arc::new(
+                    Decimal128Array::from(vec![dec3_val])
+                        .with_precision_and_scale(38, 10)
+                        .unwrap(),
+                ),
+            ],
+        )
+        .expect("record batch");
+
+        let row = ColumnarRow::new(Arc::new(batch));
+        assert_eq!(row.get_field_count(), 3);
+
+        // Verify decimal values
+        assert_eq!(
+            row.get_decimal(0, 10, 2),
+            
crate::row::Decimal::from_big_decimal(BigDecimal::new(BigInt::from(12345), 2), 
10, 2)
+                .unwrap()
+        );
+        assert_eq!(
+            row.get_decimal(1, 20, 5),
+            crate::row::Decimal::from_big_decimal(
+                BigDecimal::new(BigInt::from(1234567890), 5),
+                20,
+                5
+            )
+            .unwrap()
+        );
+        assert_eq!(
+            row.get_decimal(2, 38, 10),
+            crate::row::Decimal::from_big_decimal(
+                BigDecimal::new(BigInt::from(999999999999999999i128), 10),
+                38,
+                10
+            )
+            .unwrap()
+        );
+    }
 }
diff --git a/crates/fluss/src/row/compacted/compacted_key_writer.rs 
b/crates/fluss/src/row/compacted/compacted_key_writer.rs
index 1152b0c..339e366 100644
--- a/crates/fluss/src/row/compacted/compacted_key_writer.rs
+++ b/crates/fluss/src/row/compacted/compacted_key_writer.rs
@@ -20,6 +20,7 @@ use bytes::Bytes;
 
 use crate::error::Result;
 use crate::metadata::DataType;
+use crate::row::Decimal;
 use crate::row::binary::{BinaryRowFormat, BinaryWriter, ValueWriter};
 use delegate::delegate;
 
@@ -93,7 +94,13 @@ impl BinaryWriter for CompactedKeyWriter {
 
             fn write_double(&mut self, value: f64);
 
+            fn write_decimal(&mut self, value: &Decimal, precision: u32);
 
+            fn write_time(&mut self, value: i32, precision: u32);
+
+            fn write_timestamp_ntz(&mut self, value: 
&crate::row::datum::TimestampNtz, precision: u32);
+
+            fn write_timestamp_ltz(&mut self, value: 
&crate::row::datum::TimestampLtz, precision: u32);
         }
     }
 
diff --git a/crates/fluss/src/row/compacted/compacted_row.rs 
b/crates/fluss/src/row/compacted/compacted_row.rs
index 144f898..bc68ea1 100644
--- a/crates/fluss/src/row/compacted/compacted_row.rs
+++ b/crates/fluss/src/row/compacted/compacted_row.rs
@@ -133,6 +133,26 @@ impl<'a> InternalRow for CompactedRow<'a> {
     fn get_bytes(&self, pos: usize) -> &[u8] {
         self.decoded_row().get_bytes(pos)
     }
+
+    fn get_decimal(&self, pos: usize, precision: usize, scale: usize) -> 
crate::row::Decimal {
+        self.decoded_row().get_decimal(pos, precision, scale)
+    }
+
+    fn get_date(&self, pos: usize) -> crate::row::datum::Date {
+        self.decoded_row().get_date(pos)
+    }
+
+    fn get_time(&self, pos: usize) -> crate::row::datum::Time {
+        self.decoded_row().get_time(pos)
+    }
+
+    fn get_timestamp_ntz(&self, pos: usize, precision: u32) -> 
crate::row::datum::TimestampNtz {
+        self.decoded_row().get_timestamp_ntz(pos, precision)
+    }
+
+    fn get_timestamp_ltz(&self, pos: usize, precision: u32) -> 
crate::row::datum::TimestampLtz {
+        self.decoded_row().get_timestamp_ltz(pos, precision)
+    }
 }
 
 #[cfg(test)]
@@ -174,7 +194,7 @@ mod tests {
         writer.write_bytes(&[1, 2, 3, 4, 5]);
 
         let bytes = writer.to_bytes();
-        let mut row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
+        let row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
 
         assert_eq!(row.get_field_count(), 9);
         assert!(row.get_boolean(0));
@@ -187,70 +207,107 @@ mod tests {
         assert_eq!(row.get_string(7), "Hello World");
         assert_eq!(row.get_bytes(8), &[1, 2, 3, 4, 5]);
 
-        // Test with nulls
-        let row_type = RowType::with_data_types(
-            [
-                DataType::Int(IntType::new()),
-                DataType::String(StringType::new()),
-                DataType::Double(DoubleType::new()),
-            ]
-            .to_vec(),
-        );
+        // Test with nulls and negative values
+        let row_type = RowType::with_data_types(vec![
+            DataType::Int(IntType::new()),
+            DataType::String(StringType::new()),
+            DataType::Double(DoubleType::new()),
+        ]);
 
         let mut writer = CompactedRowWriter::new(row_type.fields().len());
-
-        writer.write_int(100);
+        writer.write_int(-42);
         writer.set_null_at(1);
         writer.write_double(2.71);
 
         let bytes = writer.to_bytes();
-        row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
+        let row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
 
         assert!(!row.is_null_at(0));
         assert!(row.is_null_at(1));
         assert!(!row.is_null_at(2));
-        assert_eq!(row.get_int(0), 100);
+        assert_eq!(row.get_int(0), -42);
         assert_eq!(row.get_double(2), 2.71);
+        // Verify caching works on repeated reads
+        assert_eq!(row.get_int(0), -42);
+    }
 
-        // Test multiple reads (caching)
-        assert_eq!(row.get_int(0), 100);
-        assert_eq!(row.get_int(0), 100);
+    #[test]
+    fn test_compacted_row_temporal_and_decimal_types() {
+        // Comprehensive test covering DATE, TIME, TIMESTAMP 
(compact/non-compact), and DECIMAL (compact/non-compact)
+        use crate::metadata::{DataTypes, DecimalType, TimestampLTzType, 
TimestampType};
+        use crate::row::Decimal;
+        use crate::row::datum::{TimestampLtz, TimestampNtz};
+        use bigdecimal::{BigDecimal, num_bigint::BigInt};
 
-        // Test from_bytes
         let row_type = RowType::with_data_types(vec![
-            DataType::Int(IntType::new()),
-            DataType::String(StringType::new()),
+            DataTypes::date(),
+            DataTypes::time(),
+            DataType::Timestamp(TimestampType::with_nullable(true, 
3).unwrap()), // Compact (precision <= 3)
+            DataType::TimestampLTz(TimestampLTzType::with_nullable(true, 
3).unwrap()), // Compact
+            DataType::Timestamp(TimestampType::with_nullable(true, 
6).unwrap()), // Non-compact (precision > 3)
+            DataType::TimestampLTz(TimestampLTzType::with_nullable(true, 
9).unwrap()), // Non-compact
+            DataType::Decimal(DecimalType::new(10, 2).unwrap()), // Compact 
(precision <= 18)
+            DataType::Decimal(DecimalType::new(28, 10).unwrap()), // 
Non-compact (precision > 18)
         ]);
 
         let mut writer = CompactedRowWriter::new(row_type.fields().len());
-        writer.write_int(-1);
-        writer.write_string("test");
-
-        let bytes = writer.to_bytes();
-        let mut row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
-
-        assert_eq!(row.get_int(0), -1);
-        assert_eq!(row.get_string(1), "test");
 
-        // Test large row
-        let num_fields = 100;
-        let row_type = RowType::with_data_types(
-            (0..num_fields)
-                .map(|_| DataType::Int(IntType::new()))
-                .collect(),
-        );
-
-        let mut writer = CompactedRowWriter::new(num_fields);
+        // Write values
+        writer.write_int(19651); // Date: 2023-10-25
+        writer.write_time(34200000, 0); // Time: 09:30:00.0
+        writer.write_timestamp_ntz(&TimestampNtz::new(1698235273182), 3); // 
Compact timestamp
+        writer.write_timestamp_ltz(&TimestampLtz::new(1698235273182), 3); // 
Compact timestamp ltz
+        let ts_ntz_high = TimestampNtz::from_millis_nanos(1698235273182, 
123456).unwrap();
+        let ts_ltz_high = TimestampLtz::from_millis_nanos(1698235273182, 
987654).unwrap();
+        writer.write_timestamp_ntz(&ts_ntz_high, 6); // Non-compact timestamp 
with nanos
+        writer.write_timestamp_ltz(&ts_ltz_high, 9); // Non-compact timestamp 
ltz with nanos
+
+        // Create Decimal values for testing
+        let small_decimal =
+            Decimal::from_big_decimal(BigDecimal::new(BigInt::from(12345), 2), 
10, 2).unwrap(); // Compact decimal: 123.45
+        let large_decimal = Decimal::from_big_decimal(
+            BigDecimal::new(BigInt::from(999999999999999999i128), 10),
+            28,
+            10,
+        )
+        .unwrap(); // Non-compact decimal
 
-        for i in 0..num_fields {
-            writer.write_int((i * 10) as i32);
-        }
+        writer.write_decimal(&small_decimal, 10);
+        writer.write_decimal(&large_decimal, 28);
 
         let bytes = writer.to_bytes();
-        row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
-
-        for i in 0..num_fields {
-            assert_eq!(row.get_int(i), (i * 10) as i32);
-        }
+        let row = CompactedRow::from_bytes(&row_type, bytes.as_ref());
+
+        // Verify all values
+        assert_eq!(row.get_date(0).get_inner(), 19651);
+        assert_eq!(row.get_time(1).get_inner(), 34200000);
+        assert_eq!(row.get_timestamp_ntz(2, 3).get_millisecond(), 
1698235273182);
+        assert_eq!(
+            row.get_timestamp_ltz(3, 3).get_epoch_millisecond(),
+            1698235273182
+        );
+        let read_ts_ntz = row.get_timestamp_ntz(4, 6);
+        assert_eq!(read_ts_ntz.get_millisecond(), 1698235273182);
+        assert_eq!(read_ts_ntz.get_nano_of_millisecond(), 123456);
+        let read_ts_ltz = row.get_timestamp_ltz(5, 9);
+        assert_eq!(read_ts_ltz.get_epoch_millisecond(), 1698235273182);
+        assert_eq!(read_ts_ltz.get_nano_of_millisecond(), 987654);
+        // Assert on Decimal equality
+        assert_eq!(row.get_decimal(6, 10, 2), small_decimal);
+        assert_eq!(row.get_decimal(7, 28, 10), large_decimal);
+
+        // Assert on Decimal components to catch any regressions
+        let read_small_decimal = row.get_decimal(6, 10, 2);
+        assert_eq!(read_small_decimal.precision(), 10);
+        assert_eq!(read_small_decimal.scale(), 2);
+        assert_eq!(read_small_decimal.to_unscaled_long().unwrap(), 12345);
+
+        let read_large_decimal = row.get_decimal(7, 28, 10);
+        assert_eq!(read_large_decimal.precision(), 28);
+        assert_eq!(read_large_decimal.scale(), 10);
+        assert_eq!(
+            read_large_decimal.to_unscaled_long().unwrap(),
+            999999999999999999i64
+        );
     }
 }
diff --git a/crates/fluss/src/row/compacted/compacted_row_reader.rs 
b/crates/fluss/src/row/compacted/compacted_row_reader.rs
index 408706c..40470db 100644
--- a/crates/fluss/src/row/compacted/compacted_row_reader.rs
+++ b/crates/fluss/src/row/compacted/compacted_row_reader.rs
@@ -19,7 +19,7 @@ use crate::metadata::RowType;
 use crate::row::compacted::compacted_row::calculate_bit_set_width_in_bytes;
 use crate::{
     metadata::DataType,
-    row::{Datum, GenericRow, 
compacted::compacted_row_writer::CompactedRowWriter},
+    row::{Datum, Decimal, GenericRow, 
compacted::compacted_row_writer::CompactedRowWriter},
     util::varint::{read_unsigned_varint_at, read_unsigned_varint_u64_at},
 };
 use std::borrow::Cow;
@@ -97,7 +97,75 @@ impl<'a> CompactedRowDeserializer<'a> {
                     let (val, next) = reader.read_bytes(cursor);
                     (Datum::Blob(val.into()), next)
                 }
-                _ => panic!("unsupported DataType in 
CompactedRowDeserializer"),
+                DataType::Decimal(decimal_type) => {
+                    let precision = decimal_type.precision();
+                    let scale = decimal_type.scale();
+                    if Decimal::is_compact_precision(precision) {
+                        // Compact: stored as i64
+                        let (val, next) = reader.read_long(cursor);
+                        let decimal = Decimal::from_unscaled_long(val, 
precision, scale)
+                            .expect("Failed to create decimal from unscaled 
long");
+                        (Datum::Decimal(decimal), next)
+                    } else {
+                        // Non-compact: stored as minimal big-endian bytes
+                        let (bytes, next) = reader.read_bytes(cursor);
+                        let decimal = Decimal::from_unscaled_bytes(bytes, 
precision, scale)
+                            .expect("Failed to create decimal from unscaled 
bytes");
+                        (Datum::Decimal(decimal), next)
+                    }
+                }
+                DataType::Date(_) => {
+                    let (val, next) = reader.read_int(cursor);
+                    (Datum::Date(crate::row::datum::Date::new(val)), next)
+                }
+                DataType::Time(_) => {
+                    let (val, next) = reader.read_int(cursor);
+                    (Datum::Time(crate::row::datum::Time::new(val)), next)
+                }
+                DataType::Timestamp(timestamp_type) => {
+                    let precision = timestamp_type.precision();
+                    if crate::row::datum::TimestampNtz::is_compact(precision) {
+                        // Compact: only milliseconds
+                        let (millis, next) = reader.read_long(cursor);
+                        (
+                            
Datum::TimestampNtz(crate::row::datum::TimestampNtz::new(millis)),
+                            next,
+                        )
+                    } else {
+                        // Non-compact: milliseconds + nanos
+                        let (millis, mid) = reader.read_long(cursor);
+                        let (nanos, next) = reader.read_int(mid);
+                        let timestamp =
+                            
crate::row::datum::TimestampNtz::from_millis_nanos(millis, nanos)
+                                .expect("Invalid nano_of_millisecond value in 
compacted row");
+                        (Datum::TimestampNtz(timestamp), next)
+                    }
+                }
+                DataType::TimestampLTz(timestamp_ltz_type) => {
+                    let precision = timestamp_ltz_type.precision();
+                    if crate::row::datum::TimestampLtz::is_compact(precision) {
+                        // Compact: only epoch milliseconds
+                        let (epoch_millis, next) = reader.read_long(cursor);
+                        (
+                            
Datum::TimestampLtz(crate::row::datum::TimestampLtz::new(epoch_millis)),
+                            next,
+                        )
+                    } else {
+                        // Non-compact: epoch milliseconds + nanos
+                        let (epoch_millis, mid) = reader.read_long(cursor);
+                        let (nanos, next) = reader.read_int(mid);
+                        let timestamp_ltz =
+                            
crate::row::datum::TimestampLtz::from_millis_nanos(epoch_millis, nanos)
+                                .expect("Invalid nano_of_millisecond value in 
compacted row");
+                        (Datum::TimestampLtz(timestamp_ltz), next)
+                    }
+                }
+                _ => {
+                    panic!(
+                        "Unsupported DataType in CompactedRowDeserializer: 
{:?}",
+                        dtype
+                    );
+                }
             };
             cursor = next_cursor;
             row.set_field(col_pos, datum);
diff --git a/crates/fluss/src/row/compacted/compacted_row_writer.rs 
b/crates/fluss/src/row/compacted/compacted_row_writer.rs
index c130e94..d1ad047 100644
--- a/crates/fluss/src/row/compacted/compacted_row_writer.rs
+++ b/crates/fluss/src/row/compacted/compacted_row_writer.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::row::Decimal;
 use crate::row::binary::BinaryWriter;
 use crate::row::compacted::compacted_row::calculate_bit_set_width_in_bytes;
 use crate::util::varint::{write_unsigned_varint_to_slice, 
write_unsigned_varint_u64_to_slice};
@@ -76,6 +77,7 @@ impl CompactedRowWriter {
         self.position = end;
     }
 }
+
 impl BinaryWriter for CompactedRowWriter {
     fn reset(&mut self) {
         self.position = self.header_size_in_bytes;
@@ -91,32 +93,34 @@ impl BinaryWriter for CompactedRowWriter {
 
     fn write_boolean(&mut self, value: bool) {
         let b = if value { 1u8 } else { 0u8 };
-        self.write_raw(&[b]);
+        self.write_raw(&[b])
     }
 
     fn write_byte(&mut self, value: u8) {
-        self.write_raw(&[value]);
+        self.write_raw(&[value])
     }
 
     fn write_bytes(&mut self, value: &[u8]) {
-        let len_i32 =
-            i32::try_from(value.len()).expect("byte slice too large to encode 
length as i32");
+        let len_i32 = i32::try_from(value.len())
+            .expect("Byte slice too large to encode length as i32: exceeds 
i32::MAX");
         self.write_int(len_i32);
-        self.write_raw(value);
+        self.write_raw(value)
     }
 
     fn write_char(&mut self, value: &str, _length: usize) {
         // TODO: currently, we encoding CHAR(length) as the same with STRING, 
the length info can be
         //  omitted and the bytes length should be enforced in the future.
-        self.write_string(value);
+        self.write_string(value)
     }
 
     fn write_string(&mut self, value: &str) {
-        self.write_bytes(value.as_ref());
+        self.write_bytes(value.as_ref())
     }
 
     fn write_short(&mut self, value: i16) {
-        self.write_raw(&value.to_ne_bytes());
+        // Use native endianness to match Java's UnsafeUtils.putShort behavior
+        // Java uses sun.misc.Unsafe which writes in native byte order 
(typically LE on x86/ARM)
+        self.write_raw(&value.to_ne_bytes())
     }
 
     fn write_int(&mut self, value: i32) {
@@ -132,21 +136,120 @@ impl BinaryWriter for CompactedRowWriter {
             write_unsigned_varint_u64_to_slice(value as u64, &mut 
self.buffer[self.position..]);
         self.position += bytes_written;
     }
+
     fn write_float(&mut self, value: f32) {
-        self.write_raw(&value.to_ne_bytes());
+        // Use native endianness to match Java's UnsafeUtils.putFloat behavior
+        self.write_raw(&value.to_ne_bytes())
     }
 
     fn write_double(&mut self, value: f64) {
-        self.write_raw(&value.to_ne_bytes());
+        // Use native endianness to match Java's UnsafeUtils.putDouble behavior
+        self.write_raw(&value.to_ne_bytes())
     }
 
     fn write_binary(&mut self, bytes: &[u8], length: usize) {
         // TODO: currently, we encoding BINARY(length) as the same with BYTES, 
the length info can
         //  be omitted and the bytes length should be enforced in the future.
-        self.write_bytes(&bytes[..length.min(bytes.len())]);
+        self.write_bytes(&bytes[..length.min(bytes.len())])
     }
 
     fn complete(&mut self) {
         // do nothing
     }
+
+    fn write_decimal(&mut self, value: &Decimal, precision: u32) {
+        // Decimal is already validated and rescaled during construction.
+        // Just serialize the precomputed unscaled representation.
+        if Decimal::is_compact_precision(precision) {
+            self.write_long(
+                value
+                    .to_unscaled_long()
+                    .expect("Decimal should fit in i64 for compact precision"),
+            )
+        } else {
+            self.write_bytes(&value.to_unscaled_bytes())
+        }
+    }
+
+    fn write_time(&mut self, value: i32, _precision: u32) {
+        // TIME is always encoded as i32 (milliseconds since midnight) 
regardless of precision
+        self.write_int(value)
+    }
+
+    fn write_timestamp_ntz(&mut self, value: &crate::row::datum::TimestampNtz, 
precision: u32) {
+        if crate::row::datum::TimestampNtz::is_compact(precision) {
+            // Compact: write only milliseconds
+            self.write_long(value.get_millisecond());
+        } else {
+            // Non-compact: write milliseconds + nanoOfMillisecond
+            self.write_long(value.get_millisecond());
+            self.write_int(value.get_nano_of_millisecond());
+        }
+    }
+
+    fn write_timestamp_ltz(&mut self, value: &crate::row::datum::TimestampLtz, 
precision: u32) {
+        if crate::row::datum::TimestampLtz::is_compact(precision) {
+            // Compact: write only epoch milliseconds
+            self.write_long(value.get_epoch_millisecond());
+        } else {
+            // Non-compact: write epoch milliseconds + nanoOfMillisecond
+            self.write_long(value.get_epoch_millisecond());
+            self.write_int(value.get_nano_of_millisecond());
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use bigdecimal::{BigDecimal, num_bigint::BigInt};
+
+    #[test]
+    fn test_write_decimal_compact() {
+        // Compact decimal (precision <= 18)
+        let bd = BigDecimal::new(BigInt::from(12345), 2); // 123.45
+        let decimal = Decimal::from_big_decimal(bd, 10, 2).unwrap();
+
+        let mut w = CompactedRowWriter::new(1);
+        w.write_decimal(&decimal, 10);
+
+        let (val, _) = crate::util::varint::read_unsigned_varint_u64_at(
+            w.buffer(),
+            w.header_size_in_bytes,
+            CompactedRowWriter::MAX_LONG_SIZE,
+        )
+        .unwrap();
+        assert_eq!(val as i64, 12345);
+    }
+
+    #[test]
+    fn test_write_decimal_rounding() {
+        // Test HALF_UP rounding: 12.345 → 12.35
+        let bd = BigDecimal::new(BigInt::from(12345), 3);
+        let decimal = Decimal::from_big_decimal(bd, 10, 2).unwrap();
+
+        let mut w = CompactedRowWriter::new(1);
+        w.write_decimal(&decimal, 10);
+
+        let (val, _) = crate::util::varint::read_unsigned_varint_u64_at(
+            w.buffer(),
+            w.header_size_in_bytes,
+            CompactedRowWriter::MAX_LONG_SIZE,
+        )
+        .unwrap();
+        assert_eq!(val as i64, 1235); // 12.35 with scale 2
+    }
+
+    #[test]
+    fn test_write_decimal_non_compact() {
+        // Non-compact (precision > 18): uses byte array
+        let bd = BigDecimal::new(BigInt::from(12345), 0);
+        let decimal = Decimal::from_big_decimal(bd, 28, 0).unwrap();
+
+        let mut w = CompactedRowWriter::new(1);
+        w.write_decimal(&decimal, 28);
+
+        // Verify something was written (at least length varint + some bytes)
+        assert!(w.position() > w.header_size_in_bytes);
+    }
 }
diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs
index ad7948d..5b21b38 100644
--- a/crates/fluss/src/row/datum.rs
+++ b/crates/fluss/src/row/datum.rs
@@ -17,6 +17,7 @@
 
 use crate::error::Error::RowConvertError;
 use crate::error::Result;
+use crate::row::Decimal;
 use arrow::array::{
     ArrayBuilder, BinaryBuilder, BooleanBuilder, Float32Builder, 
Float64Builder, Int8Builder,
     Int16Builder, Int32Builder, Int64Builder, StringBuilder,
@@ -24,7 +25,6 @@ use arrow::array::{
 use jiff::ToSpan;
 use ordered_float::OrderedFloat;
 use parse_display::Display;
-use rust_decimal::Decimal;
 use serde::Serialize;
 use std::borrow::Cow;
 
@@ -58,9 +58,11 @@ pub enum Datum<'a> {
     #[display("{0}")]
     Date(Date),
     #[display("{0}")]
-    Timestamp(Timestamp),
+    Time(Time),
     #[display("{0}")]
-    TimestampTz(TimestampLtz),
+    TimestampNtz(TimestampNtz),
+    #[display("{0}")]
+    TimestampLtz(TimestampLtz),
 }
 
 impl Datum<'_> {
@@ -296,7 +298,11 @@ impl Datum<'_> {
             Datum::Float64(v) => append_value_to_arrow!(Float64Builder, 
v.into_inner()),
             Datum::String(v) => append_value_to_arrow!(StringBuilder, 
v.as_ref()),
             Datum::Blob(v) => append_value_to_arrow!(BinaryBuilder, 
v.as_ref()),
-            Datum::Decimal(_) | Datum::Date(_) | Datum::Timestamp(_) | 
Datum::TimestampTz(_) => {
+            Datum::Decimal(_)
+            | Datum::Date(_)
+            | Datum::Time(_)
+            | Datum::TimestampNtz(_)
+            | Datum::TimestampLtz(_) => {
                 return Err(RowConvertError {
                     message: format!(
                         "Type {:?} is not yet supported for Arrow conversion",
@@ -350,10 +356,122 @@ pub type F64 = OrderedFloat<f64>;
 pub struct Date(i32);
 
 #[derive(PartialOrd, Ord, Display, PartialEq, Eq, Debug, Copy, Clone, Default, 
Hash, Serialize)]
-pub struct Timestamp(i64);
+pub struct Time(i32);
+
+impl Time {
+    pub const fn new(inner: i32) -> Self {
+        Time(inner)
+    }
+
+    /// Get the inner value of time type (milliseconds since midnight)
+    pub fn get_inner(&self) -> i32 {
+        self.0
+    }
+}
+
+/// Maximum timestamp precision that can be stored compactly (milliseconds 
only).
+/// Values with precision > MAX_COMPACT_TIMESTAMP_PRECISION require additional 
nanosecond storage.
+pub const MAX_COMPACT_TIMESTAMP_PRECISION: u32 = 3;
+
+/// Maximum valid value for nanoseconds within a millisecond (0 to 999,999 
inclusive).
+/// A millisecond contains 1,000,000 nanoseconds, so the fractional part 
ranges from 0 to 999,999.
+pub const MAX_NANO_OF_MILLISECOND: i32 = 999_999;
 
 #[derive(PartialOrd, Ord, Display, PartialEq, Eq, Debug, Copy, Clone, Default, 
Hash, Serialize)]
-pub struct TimestampLtz(i64);
+#[display("{millisecond}")]
+pub struct TimestampNtz {
+    millisecond: i64,
+    nano_of_millisecond: i32,
+}
+
+impl TimestampNtz {
+    pub const fn new(millisecond: i64) -> Self {
+        TimestampNtz {
+            millisecond,
+            nano_of_millisecond: 0,
+        }
+    }
+
+    pub fn from_millis_nanos(
+        millisecond: i64,
+        nano_of_millisecond: i32,
+    ) -> crate::error::Result<Self> {
+        if !(0..=MAX_NANO_OF_MILLISECOND).contains(&nano_of_millisecond) {
+            return Err(crate::error::Error::IllegalArgument {
+                message: format!(
+                    "nanoOfMillisecond must be in range [0, {}], got: {}",
+                    MAX_NANO_OF_MILLISECOND, nano_of_millisecond
+                ),
+            });
+        }
+        Ok(TimestampNtz {
+            millisecond,
+            nano_of_millisecond,
+        })
+    }
+
+    pub fn get_millisecond(&self) -> i64 {
+        self.millisecond
+    }
+
+    pub fn get_nano_of_millisecond(&self) -> i32 {
+        self.nano_of_millisecond
+    }
+
+    /// Check if the timestamp is compact based on precision.
+    /// Precision <= MAX_COMPACT_TIMESTAMP_PRECISION means millisecond 
precision, no need for nanos.
+    pub fn is_compact(precision: u32) -> bool {
+        precision <= MAX_COMPACT_TIMESTAMP_PRECISION
+    }
+}
+
+#[derive(PartialOrd, Ord, Display, PartialEq, Eq, Debug, Copy, Clone, Default, 
Hash, Serialize)]
+#[display("{epoch_millisecond}")]
+pub struct TimestampLtz {
+    epoch_millisecond: i64,
+    nano_of_millisecond: i32,
+}
+
+impl TimestampLtz {
+    pub const fn new(epoch_millisecond: i64) -> Self {
+        TimestampLtz {
+            epoch_millisecond,
+            nano_of_millisecond: 0,
+        }
+    }
+
+    pub fn from_millis_nanos(
+        epoch_millisecond: i64,
+        nano_of_millisecond: i32,
+    ) -> crate::error::Result<Self> {
+        if !(0..=MAX_NANO_OF_MILLISECOND).contains(&nano_of_millisecond) {
+            return Err(crate::error::Error::IllegalArgument {
+                message: format!(
+                    "nanoOfMillisecond must be in range [0, {}], got: {}",
+                    MAX_NANO_OF_MILLISECOND, nano_of_millisecond
+                ),
+            });
+        }
+        Ok(TimestampLtz {
+            epoch_millisecond,
+            nano_of_millisecond,
+        })
+    }
+
+    pub fn get_epoch_millisecond(&self) -> i64 {
+        self.epoch_millisecond
+    }
+
+    pub fn get_nano_of_millisecond(&self) -> i32 {
+        self.nano_of_millisecond
+    }
+
+    /// Check if the timestamp is compact based on precision.
+    /// Precision <= MAX_COMPACT_TIMESTAMP_PRECISION means millisecond 
precision, no need for nanos.
+    pub fn is_compact(precision: u32) -> bool {
+        precision <= MAX_COMPACT_TIMESTAMP_PRECISION
+    }
+}
 
 pub type Blob<'a> = Cow<'a, [u8]>;
 
@@ -461,3 +579,54 @@ mod tests {
         assert_eq!(date.day(), 1);
     }
 }
+
+#[cfg(test)]
+mod timestamp_tests {
+    use super::*;
+
+    #[test]
+    fn test_timestamp_valid_nanos() {
+        // Valid range: 0 to MAX_NANO_OF_MILLISECOND for both TimestampNtz and 
TimestampLtz
+        let ntz1 = TimestampNtz::from_millis_nanos(1000, 0).unwrap();
+        assert_eq!(ntz1.get_nano_of_millisecond(), 0);
+
+        let ntz2 = TimestampNtz::from_millis_nanos(1000, 
MAX_NANO_OF_MILLISECOND).unwrap();
+        assert_eq!(ntz2.get_nano_of_millisecond(), MAX_NANO_OF_MILLISECOND);
+
+        let ntz3 = TimestampNtz::from_millis_nanos(1000, 500_000).unwrap();
+        assert_eq!(ntz3.get_nano_of_millisecond(), 500_000);
+
+        let ltz1 = TimestampLtz::from_millis_nanos(1000, 0).unwrap();
+        assert_eq!(ltz1.get_nano_of_millisecond(), 0);
+
+        let ltz2 = TimestampLtz::from_millis_nanos(1000, 
MAX_NANO_OF_MILLISECOND).unwrap();
+        assert_eq!(ltz2.get_nano_of_millisecond(), MAX_NANO_OF_MILLISECOND);
+    }
+
+    #[test]
+    fn test_timestamp_nanos_out_of_range() {
+        // Test that both TimestampNtz and TimestampLtz reject invalid nanos
+        let expected_msg = format!(
+            "nanoOfMillisecond must be in range [0, {}]",
+            MAX_NANO_OF_MILLISECOND
+        );
+
+        // Too large (1,000,000 is just beyond the valid range)
+        let result_ntz = TimestampNtz::from_millis_nanos(1000, 
MAX_NANO_OF_MILLISECOND + 1);
+        assert!(result_ntz.is_err());
+        assert!(result_ntz.unwrap_err().to_string().contains(&expected_msg));
+
+        let result_ltz = TimestampLtz::from_millis_nanos(1000, 
MAX_NANO_OF_MILLISECOND + 1);
+        assert!(result_ltz.is_err());
+        assert!(result_ltz.unwrap_err().to_string().contains(&expected_msg));
+
+        // Negative
+        let result_ntz = TimestampNtz::from_millis_nanos(1000, -1);
+        assert!(result_ntz.is_err());
+        assert!(result_ntz.unwrap_err().to_string().contains(&expected_msg));
+
+        let result_ltz = TimestampLtz::from_millis_nanos(1000, -1);
+        assert!(result_ltz.is_err());
+        assert!(result_ltz.unwrap_err().to_string().contains(&expected_msg));
+    }
+}
diff --git a/crates/fluss/src/row/decimal.rs b/crates/fluss/src/row/decimal.rs
new file mode 100644
index 0000000..b14bde5
--- /dev/null
+++ b/crates/fluss/src/row/decimal.rs
@@ -0,0 +1,477 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::{Error, Result};
+use bigdecimal::num_bigint::BigInt;
+use bigdecimal::num_traits::Zero;
+use bigdecimal::{BigDecimal, RoundingMode};
+use std::fmt;
+
+#[cfg(test)]
+use std::str::FromStr;
+
+/// Maximum decimal precision that can be stored compactly as a single i64.
+/// Values with precision > MAX_COMPACT_PRECISION require byte array storage.
+pub const MAX_COMPACT_PRECISION: u32 = 18;
+
+/// An internal data structure representing a decimal value with fixed 
precision and scale.
+///
+/// This data structure is immutable and stores decimal values in a compact 
representation
+/// (as a long value) if values are small enough (precision ≤ 18).
+///
+/// Matches Java's org.apache.fluss.row.Decimal class.
+#[derive(Debug, Clone, serde::Serialize)]
+pub struct Decimal {
+    precision: u32,
+    scale: u32,
+    // If precision <= MAX_COMPACT_PRECISION, this holds the unscaled value
+    long_val: Option<i64>,
+    // BigDecimal representation (may be cached)
+    decimal_val: Option<BigDecimal>,
+}
+
+impl Decimal {
+    /// Returns the precision of this Decimal.
+    ///
+    /// The precision is the number of digits in the unscaled value.
+    pub fn precision(&self) -> u32 {
+        self.precision
+    }
+
+    /// Returns the scale of this Decimal.
+    pub fn scale(&self) -> u32 {
+        self.scale
+    }
+
+    /// Returns whether the decimal value is small enough to be stored in a 
long.
+    pub fn is_compact(&self) -> bool {
+        self.precision <= MAX_COMPACT_PRECISION
+    }
+
+    /// Returns whether a given precision can be stored compactly.
+    pub fn is_compact_precision(precision: u32) -> bool {
+        precision <= MAX_COMPACT_PRECISION
+    }
+
+    /// Converts this Decimal into a BigDecimal.
+    pub fn to_big_decimal(&self) -> BigDecimal {
+        if let Some(bd) = &self.decimal_val {
+            bd.clone()
+        } else if let Some(long_val) = self.long_val {
+            BigDecimal::new(BigInt::from(long_val), self.scale as i64)
+        } else {
+            // Should never happen - we always have one representation
+            BigDecimal::new(BigInt::from(0), self.scale as i64)
+        }
+    }
+
+    /// Returns a long describing the unscaled value of this Decimal.
+    pub fn to_unscaled_long(&self) -> Result<i64> {
+        if let Some(long_val) = self.long_val {
+            Ok(long_val)
+        } else {
+            // Extract unscaled value from BigDecimal
+            let bd = self.to_big_decimal();
+            let (unscaled, _) = bd.as_bigint_and_exponent();
+            unscaled.try_into().map_err(|_| Error::IllegalArgument {
+                message: format!(
+                    "Decimal unscaled value does not fit in i64: precision={}",
+                    self.precision
+                ),
+            })
+        }
+    }
+
+    /// Returns a byte array describing the unscaled value of this Decimal.
+    pub fn to_unscaled_bytes(&self) -> Vec<u8> {
+        let bd = self.to_big_decimal();
+        let (unscaled, _) = bd.as_bigint_and_exponent();
+        unscaled.to_signed_bytes_be()
+    }
+
+    /// Creates a Decimal from Arrow's Decimal128 representation.
+    // TODO: For compact decimals with matching scale we may call 
from_unscaled_long
+    pub fn from_arrow_decimal128(
+        i128_val: i128,
+        arrow_scale: i64,
+        precision: u32,
+        scale: u32,
+    ) -> Result<Self> {
+        let bd = BigDecimal::new(BigInt::from(i128_val), arrow_scale);
+        Self::from_big_decimal(bd, precision, scale)
+    }
+
+    /// Creates an instance of Decimal from a BigDecimal with the given 
precision and scale.
+    ///
+    /// The returned decimal value may be rounded to have the desired scale. 
The precision
+    /// will be checked. If the precision overflows, an error is returned.
+    pub fn from_big_decimal(bd: BigDecimal, precision: u32, scale: u32) -> 
Result<Self> {
+        // Rescale to the target scale with HALF_UP rounding (matches Java)
+        let scaled = bd.with_scale_round(scale as i64, RoundingMode::HalfUp);
+
+        // Extract unscaled value
+        let (unscaled, exp) = scaled.as_bigint_and_exponent();
+
+        // Sanity check that scale matches
+        debug_assert_eq!(
+            exp, scale as i64,
+            "Scaled decimal exponent ({}) != expected scale ({})",
+            exp, scale
+        );
+
+        let actual_precision = Self::compute_precision(&unscaled);
+        if actual_precision > precision as usize {
+            return Err(Error::IllegalArgument {
+                message: format!(
+                    "Decimal precision overflow: value has {} digits but 
precision is {} (value: {})",
+                    actual_precision, precision, scaled
+                ),
+            });
+        }
+
+        // Compute compact representation if possible
+        let long_val = if precision <= MAX_COMPACT_PRECISION {
+            Some(i64::try_from(&unscaled).map_err(|_| Error::IllegalArgument {
+                message: format!(
+                    "Decimal mantissa exceeds i64 range for compact precision 
{}: unscaled={} (value={})",
+                    precision, unscaled, scaled
+                ),
+            })?)
+        } else {
+            None
+        };
+
+        Ok(Decimal {
+            precision,
+            scale,
+            long_val,
+            decimal_val: Some(scaled),
+        })
+    }
+
+    /// Creates an instance of Decimal from an unscaled long value with the 
given precision and scale.
+    pub fn from_unscaled_long(unscaled_long: i64, precision: u32, scale: u32) 
-> Result<Self> {
+        if precision > MAX_COMPACT_PRECISION {
+            return Err(Error::IllegalArgument {
+                message: format!(
+                    "Precision {} exceeds MAX_COMPACT_PRECISION ({})",
+                    precision, MAX_COMPACT_PRECISION
+                ),
+            });
+        }
+
+        let actual_precision = 
Self::compute_precision(&BigInt::from(unscaled_long));
+        if actual_precision > precision as usize {
+            return Err(Error::IllegalArgument {
+                message: format!(
+                    "Decimal precision overflow: unscaled value has {} digits 
but precision is {}",
+                    actual_precision, precision
+                ),
+            });
+        }
+
+        Ok(Decimal {
+            precision,
+            scale,
+            long_val: Some(unscaled_long),
+            decimal_val: None,
+        })
+    }
+
+    /// Creates an instance of Decimal from an unscaled byte array with the 
given precision and scale.
+    pub fn from_unscaled_bytes(unscaled_bytes: &[u8], precision: u32, scale: 
u32) -> Result<Self> {
+        let unscaled = BigInt::from_signed_bytes_be(unscaled_bytes);
+        let bd = BigDecimal::new(unscaled, scale as i64);
+        Self::from_big_decimal(bd, precision, scale)
+    }
+
+    /// Computes the precision of a decimal's unscaled value, matching Java's 
BigDecimal.precision().
+    pub fn compute_precision(unscaled: &BigInt) -> usize {
+        if unscaled.is_zero() {
+            return 1;
+        }
+
+        // Count ALL digits in the unscaled value (matches Java's 
BigDecimal.precision())
+        // For bounded precision (≤ 38 digits), string conversion is cheap and 
simple.
+        unscaled.magnitude().to_str_radix(10).len()
+    }
+}
+
+impl fmt::Display for Decimal {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "{}", self.to_big_decimal())
+    }
+}
+
+// Manual implementations of comparison traits to ignore cached fields
+impl PartialEq for Decimal {
+    fn eq(&self, other: &Self) -> bool {
+        // Use numeric equality like Java's Decimal.equals() which delegates 
to compareTo.
+        // This means 1.0 (scale=1) equals 1.00 (scale=2).
+        self.cmp(other) == std::cmp::Ordering::Equal
+    }
+}
+
+impl Eq for Decimal {}
+
+impl PartialOrd for Decimal {
+    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+impl Ord for Decimal {
+    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+        // If both are compact and have the same scale, compare directly
+        if self.is_compact() && other.is_compact() && self.scale == 
other.scale {
+            self.long_val.cmp(&other.long_val)
+        } else {
+            // Otherwise, compare as BigDecimal
+            self.to_big_decimal().cmp(&other.to_big_decimal())
+        }
+    }
+}
+
+impl std::hash::Hash for Decimal {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        // Hash the BigDecimal representation.
+        //
+        // IMPORTANT: Unlike Java's BigDecimal, Rust's bigdecimal crate 
normalizes
+        // before hashing, so hash(1.0) == hash(1.00). Combined with our 
numeric
+        // equality (1.0 == 1.00), this CORRECTLY satisfies the hash/equals 
contract.
+        //
+        // This is BETTER than Java's implementation which has a hash/equals 
violation:
+        // - Java: equals(1.0, 1.00) = true, but hashCode(1.0) != 
hashCode(1.00)
+        // - Rust: equals(1.0, 1.00) = true, and hash(1.0) == hash(1.00) ✓
+        //
+        // Result: HashMap/HashSet will work correctly even if you create 
Decimals
+        // with different scales for the same numeric value (though this is 
rare in
+        // practice since decimals are schema-driven with fixed 
precision/scale).
+        self.to_big_decimal().hash(state);
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_precision_calculation() {
+        // Zero is special case
+        assert_eq!(Decimal::compute_precision(&BigInt::from(0)), 1);
+
+        // Must count ALL digits including trailing zeros (matches Java 
BigDecimal.precision())
+        assert_eq!(Decimal::compute_precision(&BigInt::from(10)), 2);
+        assert_eq!(Decimal::compute_precision(&BigInt::from(100)), 3);
+        assert_eq!(Decimal::compute_precision(&BigInt::from(12300)), 5);
+        assert_eq!(
+            Decimal::compute_precision(&BigInt::from(10000000000i64)),
+            11
+        );
+
+        // Test the case: value=1, scale=10 → unscaled=10000000000 (11 digits)
+        let bd = BigDecimal::new(BigInt::from(1), 0);
+        assert!(
+            Decimal::from_big_decimal(bd.clone(), 1, 10).is_err(),
+            "Should reject: unscaled 10000000000 has 11 digits, precision=1 is 
too small"
+        );
+        assert!(
+            Decimal::from_big_decimal(bd, 11, 10).is_ok(),
+            "Should accept with correct precision=11"
+        );
+    }
+
+    /// Test precision validation boundaries
+    #[test]
+    fn test_precision_validation() {
+        let test_cases = vec![
+            (10i64, 1, 2),            // 1.0 → unscaled: 10 (2 digits)
+            (100i64, 2, 3),           // 1.00 → unscaled: 100 (3 digits)
+            (10000000000i64, 10, 11), // 1.0000000000 → unscaled: 10000000000 
(11 digits)
+        ];
+
+        for (unscaled, scale, min_precision) in test_cases {
+            let bd = BigDecimal::new(BigInt::from(unscaled), scale as i64);
+
+            // Reject if precision too small
+            assert!(Decimal::from_big_decimal(bd.clone(), min_precision - 1, 
scale).is_err());
+            // Accept with correct precision
+            assert!(Decimal::from_big_decimal(bd, min_precision, 
scale).is_ok());
+        }
+
+        // i64::MAX has 19 digits, should reject with precision=5
+        let bd = BigDecimal::new(BigInt::from(i64::MAX), 0);
+        assert!(Decimal::from_big_decimal(bd, 5, 0).is_err());
+    }
+
+    /// Test creation and basic operations for both compact and non-compact 
decimals
+    #[test]
+    fn test_creation_and_representation() {
+        // Compact (precision ≤ 18): from unscaled long
+        let compact = Decimal::from_unscaled_long(12345, 10, 2).unwrap();
+        assert_eq!(compact.precision(), 10);
+        assert_eq!(compact.scale(), 2);
+        assert!(compact.is_compact());
+        assert_eq!(compact.to_unscaled_long().unwrap(), 12345);
+        assert_eq!(compact.to_big_decimal().to_string(), "123.45");
+
+        // Non-compact (precision > 18): from BigDecimal
+        let bd = BigDecimal::new(BigInt::from(12345), 0);
+        let non_compact = Decimal::from_big_decimal(bd, 28, 0).unwrap();
+        assert_eq!(non_compact.precision(), 28);
+        assert!(!non_compact.is_compact());
+        assert_eq!(
+            non_compact.to_unscaled_bytes(),
+            BigInt::from(12345).to_signed_bytes_be()
+        );
+
+        // Test compact boundary
+        assert!(Decimal::is_compact_precision(18));
+        assert!(!Decimal::is_compact_precision(19));
+
+        // Test rounding during creation
+        let bd = BigDecimal::new(BigInt::from(12345), 3); // 12.345
+        let rounded = Decimal::from_big_decimal(bd, 10, 2).unwrap();
+        assert_eq!(rounded.to_unscaled_long().unwrap(), 1235); // 12.35
+    }
+
+    /// Test serialization round-trip (unscaled bytes)
+    #[test]
+    fn test_serialization_roundtrip() {
+        // Compact decimal
+        let bd1 = BigDecimal::new(BigInt::from(1314567890123i64), 5); // 
13145678.90123
+        let decimal1 = Decimal::from_big_decimal(bd1.clone(), 15, 5).unwrap();
+        let (unscaled1, _) = bd1.as_bigint_and_exponent();
+        let from_bytes1 =
+            Decimal::from_unscaled_bytes(&unscaled1.to_signed_bytes_be(), 15, 
5).unwrap();
+        assert_eq!(from_bytes1, decimal1);
+        assert_eq!(
+            from_bytes1.to_unscaled_bytes(),
+            unscaled1.to_signed_bytes_be()
+        );
+
+        // Non-compact decimal
+        let bd2 = BigDecimal::new(BigInt::from(12345678900987654321i128), 10);
+        let decimal2 = Decimal::from_big_decimal(bd2.clone(), 23, 10).unwrap();
+        let (unscaled2, _) = bd2.as_bigint_and_exponent();
+        let from_bytes2 =
+            Decimal::from_unscaled_bytes(&unscaled2.to_signed_bytes_be(), 23, 
10).unwrap();
+        assert_eq!(from_bytes2, decimal2);
+        assert_eq!(
+            from_bytes2.to_unscaled_bytes(),
+            unscaled2.to_signed_bytes_be()
+        );
+    }
+
+    /// Test numeric equality and ordering (matches Java semantics)
+    #[test]
+    fn test_equality_and_ordering() {
+        // Same value, different precision/scale → should be equal (numeric 
equality)
+        let d1 = Decimal::from_big_decimal(BigDecimal::new(BigInt::from(10), 
1), 2, 1).unwrap(); // 1.0
+        let d2 = Decimal::from_big_decimal(BigDecimal::new(BigInt::from(100), 
2), 3, 2).unwrap(); // 1.00
+        assert_eq!(d1, d2, "Numeric equality: 1.0 == 1.00");
+        assert_eq!(d1.cmp(&d2), std::cmp::Ordering::Equal);
+
+        // Test ordering with positive values
+        let small = Decimal::from_unscaled_long(10, 5, 0).unwrap();
+        let large = Decimal::from_unscaled_long(15, 5, 0).unwrap();
+        assert!(small < large);
+        assert_eq!(small.cmp(&large), std::cmp::Ordering::Less);
+
+        // Test ordering with negative values
+        let negative_large = Decimal::from_unscaled_long(-10, 5, 0).unwrap(); 
// -10
+        let negative_small = Decimal::from_unscaled_long(-15, 5, 0).unwrap(); 
// -15
+        assert!(negative_small < negative_large); // -15 < -10
+        assert_eq!(
+            negative_small.cmp(&negative_large),
+            std::cmp::Ordering::Less
+        );
+
+        // Test ordering with mixed positive and negative
+        let positive = Decimal::from_unscaled_long(5, 5, 0).unwrap();
+        let negative = Decimal::from_unscaled_long(-5, 5, 0).unwrap();
+        assert!(negative < positive);
+        assert_eq!(negative.cmp(&positive), std::cmp::Ordering::Less);
+
+        // Test clone and round-trip equality
+        let original = Decimal::from_unscaled_long(10, 5, 0).unwrap();
+        assert_eq!(original.clone(), original);
+        assert_eq!(
+            Decimal::from_unscaled_long(original.to_unscaled_long().unwrap(), 
5, 0).unwrap(),
+            original
+        );
+    }
+
+    /// Test hash/equals contract (Rust implementation is correct, unlike Java)
+    #[test]
+    fn test_hash_equals_contract() {
+        use std::collections::hash_map::DefaultHasher;
+        use std::hash::{Hash, Hasher};
+
+        let d1 = Decimal::from_big_decimal(BigDecimal::new(BigInt::from(10), 
1), 2, 1).unwrap(); // 1.0
+        let d2 = Decimal::from_big_decimal(BigDecimal::new(BigInt::from(100), 
2), 3, 2).unwrap(); // 1.00
+
+        // Numeric equality
+        assert_eq!(d1, d2);
+
+        // Hash contract: if a == b, then hash(a) == hash(b)
+        let mut hasher1 = DefaultHasher::new();
+        d1.hash(&mut hasher1);
+        let hash1 = hasher1.finish();
+
+        let mut hasher2 = DefaultHasher::new();
+        d2.hash(&mut hasher2);
+        let hash2 = hasher2.finish();
+
+        assert_eq!(hash1, hash2, "Equal decimals must have equal hashes");
+
+        // Verify HashMap works correctly (this would fail in Java due to 
their hash/equals bug)
+        let mut map = std::collections::HashMap::new();
+        map.insert(d1.clone(), "value");
+        assert_eq!(map.get(&d2), Some(&"value"));
+    }
+
+    /// Test edge cases: zeros, large numbers, rescaling
+    #[test]
+    fn test_edge_cases() {
+        // Zero handling (compact and non-compact)
+        let zero_compact = Decimal::from_unscaled_long(0, 5, 2).unwrap();
+        assert_eq!(
+            zero_compact.to_big_decimal(),
+            BigDecimal::new(BigInt::from(0), 2)
+        );
+
+        let zero_non_compact =
+            Decimal::from_big_decimal(BigDecimal::new(BigInt::from(0), 2), 20, 
2).unwrap();
+        assert_eq!(
+            zero_non_compact.to_big_decimal(),
+            BigDecimal::new(BigInt::from(0), 2)
+        );
+
+        // Large number (39 digits)
+        let large_bd = 
BigDecimal::from_str("123456789012345678901234567890123456789").unwrap();
+        let large = Decimal::from_big_decimal(large_bd, 39, 0).unwrap();
+        let double_val = 
large.to_big_decimal().to_string().parse::<f64>().unwrap();
+        assert!((double_val - 1.2345678901234568E38).abs() < 0.01);
+
+        // Rescaling: 5.0 (scale=1) → 5.00 (scale=2)
+        let d1 = Decimal::from_big_decimal(BigDecimal::new(BigInt::from(50), 
1), 10, 1).unwrap();
+        let d2 = Decimal::from_big_decimal(d1.to_big_decimal(), 10, 
2).unwrap();
+        assert_eq!(d2.to_big_decimal().to_string(), "5.00");
+        assert_eq!(d2.scale(), 2);
+    }
+}
diff --git a/crates/fluss/src/row/encode/compacted_key_encoder.rs 
b/crates/fluss/src/row/encode/compacted_key_encoder.rs
index ebe3da2..563c1c9 100644
--- a/crates/fluss/src/row/encode/compacted_key_encoder.rs
+++ b/crates/fluss/src/row/encode/compacted_key_encoder.rs
@@ -238,86 +238,121 @@ mod tests {
     }
 
     #[test]
-    fn test_all_data_types() {
+    fn test_all_data_types_java_compatible() {
+        // Test encoding compatibility with Java using reference from:
+        // 
https://github.com/apache/fluss/blob/main/fluss-common/src/test/resources/encoding/encoded_key.hex
+        use crate::metadata::{DataType, TimestampLTzType, TimestampType};
+
         let row_type = RowType::with_data_types(vec![
-            DataTypes::boolean(),
-            DataTypes::tinyint(),
-            DataTypes::smallint(),
-            DataTypes::int(),
-            DataTypes::bigint(),
-            DataTypes::float(),
-            DataTypes::double(),
-            // TODO Date
-            // TODO Time
-            DataTypes::binary(20),
-            DataTypes::bytes(),
-            DataTypes::char(2),
-            DataTypes::string(),
-            // TODO Decimal
-            // TODO Timestamp
-            // TODO Timestamp LTZ
-            // TODO Array of Int
-            // TODO Array of Float
-            // TODO Array of String
-            // TODO: Add Map and Row fields in Issue #1973
+            DataTypes::boolean(),                                              
   // BOOLEAN
+            DataTypes::tinyint(),                                              
   // TINYINT
+            DataTypes::smallint(),                                             
   // SMALLINT
+            DataTypes::int(),                                                  
   // INT
+            DataTypes::bigint(),                                               
   // BIGINT
+            DataTypes::float(),                                                
   // FLOAT
+            DataTypes::double(),                                               
   // DOUBLE
+            DataTypes::date(),                                                 
   // DATE
+            DataTypes::time(),                                                 
   // TIME
+            DataTypes::binary(20),                                             
   // BINARY(20)
+            DataTypes::bytes(),                                                
   // BYTES
+            DataTypes::char(2),                                                
   // CHAR(2)
+            DataTypes::string(),                                               
   // STRING
+            DataTypes::decimal(5, 2),                                          
   // DECIMAL(5,2)
+            DataTypes::decimal(20, 0),                                         
   // DECIMAL(20,0)
+            DataType::Timestamp(TimestampType::with_nullable(false, 
1).unwrap()), // TIMESTAMP(1)
+            DataType::Timestamp(TimestampType::with_nullable(false, 
5).unwrap()), // TIMESTAMP(5)
+            DataType::TimestampLTz(TimestampLTzType::with_nullable(false, 
1).unwrap()), // TIMESTAMP_LTZ(1)
+            DataType::TimestampLTz(TimestampLTzType::with_nullable(false, 
5).unwrap()), // TIMESTAMP_LTZ(5)
+                                                                               
         // TODO: Add support for ARRAY type
+                                                                               
         // TODO: Add support for MAP type
+                                                                               
         // TODO: Add support for ROW type
         ]);
 
+        // Exact values from Java's IndexedRowTest.genRecordForAllTypes()
         let row = GenericRow::from_data(vec![
-            Datum::from(true),
-            Datum::from(2i8),
-            Datum::from(10i16),
-            Datum::from(100i32),
-            Datum::from(-6101065172474983726i64), // from Java test case: new 
BigInteger("12345678901234567890").longValue()
-            Datum::from(13.2f32),
-            Datum::from(15.21f64),
-            // TODO Date
-            // TODO Time
-            Datum::from("1234567890".as_bytes()),
-            Datum::from("20".as_bytes()),
-            Datum::from("1"),
-            Datum::from("hello"),
-            // TODO Decimal
-            // TODO Timestamp
-            // TODO Timestamp LTZ
-            // TODO Array of Int
-            // TODO Array of Float
-            // TODO Array of String
-            // TODO: Add Map and Row fields in Issue #1973
+            Datum::from(true),                                   // BOOLEAN: 
true
+            Datum::from(2i8),                                    // TINYINT: 2
+            Datum::from(10i16),                                  // SMALLINT: 
10
+            Datum::from(100i32),                                 // INT: 100
+            Datum::from(-6101065172474983726i64),                // BIGINT
+            Datum::from(13.2f32),                                // FLOAT: 13.2
+            Datum::from(15.21f64),                               // DOUBLE: 
15.21
+            Datum::Date(crate::row::datum::Date::new(19655)), // DATE: 
2023-10-25 (19655 days since epoch)
+            Datum::Time(crate::row::datum::Time::new(34200000)), // TIME: 
09:30:00.0
+            Datum::from("1234567890".as_bytes()),             // BINARY(20)
+            Datum::from("20".as_bytes()),                     // BYTES
+            Datum::from("1"),                                 // CHAR(2): "1"
+            Datum::from("hello"),                             // STRING: 
"hello"
+            Datum::Decimal(crate::row::Decimal::from_unscaled_long(9, 5, 
2).unwrap()), // DECIMAL(5,2)
+            Datum::Decimal(
+                crate::row::Decimal::from_big_decimal(
+                    
bigdecimal::BigDecimal::new(bigdecimal::num_bigint::BigInt::from(10), 0),
+                    20,
+                    0,
+                )
+                .unwrap(),
+            ), // DECIMAL(20,0)
+            
Datum::TimestampNtz(crate::row::datum::TimestampNtz::new(1698235273182)), // 
TIMESTAMP(1)
+            
Datum::TimestampNtz(crate::row::datum::TimestampNtz::new(1698235273182)), // 
TIMESTAMP(5)
+            
Datum::TimestampLtz(crate::row::datum::TimestampLtz::new(1698235273182)), // 
TIMESTAMP_LTZ(1)
+            
Datum::TimestampLtz(crate::row::datum::TimestampLtz::new(1698235273182)), // 
TIMESTAMP_LTZ(5)
         ]);
 
-        let mut encoder = for_test_row_type(&row_type);
-
-        let mut expected: Vec<u8> = Vec::new();
-        // BOOLEAN: true
-        expected.extend(vec![0x01]);
-        // TINYINT: 2
-        expected.extend(vec![0x02]);
-        // SMALLINT: 10
-        expected.extend(vec![0x0A]);
-        // INT: 100
-        expected.extend(vec![0x00, 0x64]);
-        // BIGINT: -6101065172474983726
-        expected.extend(vec![
+        // Expected bytes from Java's encoded_key.hex reference file
+        #[rustfmt::skip]
+        let expected: Vec<u8> = vec![
+            // BOOLEAN: true
+            0x01,
+            // TINYINT: 2
+            0x02,
+            // SMALLINT: 10 (varint encoded)
+            0x0A,
+            // INT: 100 (varint encoded)
+            0x00, 0x64,
+            // BIGINT: -6101065172474983726
             0xD2, 0x95, 0xFC, 0xD8, 0xCE, 0xB1, 0xAA, 0xAA, 0xAB, 0x01,
-        ]);
-        // FLOAT: 13.2
-        expected.extend(vec![0x33, 0x33, 0x53, 0x41]);
-        // DOUBLE: 15.21
-        expected.extend(vec![0xEC, 0x51, 0xB8, 0x1E, 0x85, 0x6B, 0x2E, 0x40]);
-        // BINARY(20): "1234567890".getBytes()
-        expected.extend(vec![
+            // FLOAT: 13.2
+            0x33, 0x33, 0x53, 0x41,
+            // DOUBLE: 15.21
+            0xEC, 0x51, 0xB8, 0x1E, 0x85, 0x6B, 0x2E, 0x40,
+            // DATE: 2023-10-25
+            0xC7, 0x99, 0x01,
+            // TIME: 09:30:00.0
+            0xC0, 0xB3, 0xA7, 0x10,
+            // BINARY(20): "1234567890"
             0x0A, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 0x38, 0x39, 0x30,
-        ]);
+            // BYTES: "20"
+            0x02, 0x32, 0x30,
+            // CHAR(2): "1"
+            0x01, 0x31,
+            // STRING: "hello"
+            0x05, 0x68, 0x65, 0x6C, 0x6C, 0x6F,
+            // DECIMAL(5,2): 9
+            0x09,
+            // DECIMAL(20,0): 10
+            0x01, 0x0A,
+            // TIMESTAMP(1): 1698235273182
+            0xDE, 0x9F, 0xD7, 0xB5, 0xB6, 0x31,
+            // TIMESTAMP(5): 1698235273182
+            0xDE, 0x9F, 0xD7, 0xB5, 0xB6, 0x31, 0x00,
+            // TIMESTAMP_LTZ(1): 1698235273182
+            0xDE, 0x9F, 0xD7, 0xB5, 0xB6, 0x31,
+            // TIMESTAMP_LTZ(5): 1698235273182
+            0xDE, 0x9F, 0xD7, 0xB5, 0xB6, 0x31, 0x00,
+        ];
 
-        // BYTES: "20".getBytes()
-        expected.extend(vec![0x02, 0x32, 0x30]);
-        // CHAR(2): "1"
-        expected.extend(vec![0x01, 0x31]);
-        // STRING: String: "hello"
-        expected.extend(vec![0x05, 0x68, 0x65, 0x6C, 0x6C, 0x6F]);
+        let mut encoder = for_test_row_type(&row_type);
+        let encoded = encoder.encode_key(&row).unwrap();
+
+        // Assert byte-for-byte compatibility with Java's encoded_key.hex
         assert_eq!(
-            encoder.encode_key(&row).unwrap().iter().as_slice(),
-            expected.as_slice()
+            encoded.iter().as_slice(),
+            expected.as_slice(),
+            "\n\nRust encoding does not match Java reference from 
encoded_key.hex\n\
+             Expected: {:02X?}\n\
+             Actual:   {:02X?}\n",
+            expected,
+            encoded.iter().as_slice()
         );
     }
 }
diff --git a/crates/fluss/src/row/field_getter.rs 
b/crates/fluss/src/row/field_getter.rs
index 97f9e39..cbffa4d 100644
--- a/crates/fluss/src/row/field_getter.rs
+++ b/crates/fluss/src/row/field_getter.rs
@@ -66,6 +66,21 @@ impl FieldGetter {
             DataType::BigInt(_) => InnerFieldGetter::BigInt { pos },
             DataType::Float(_) => InnerFieldGetter::Float { pos },
             DataType::Double(_) => InnerFieldGetter::Double { pos },
+            DataType::Decimal(decimal_type) => InnerFieldGetter::Decimal {
+                pos,
+                precision: decimal_type.precision() as usize,
+                scale: decimal_type.scale() as usize,
+            },
+            DataType::Date(_) => InnerFieldGetter::Date { pos },
+            DataType::Time(_) => InnerFieldGetter::Time { pos },
+            DataType::Timestamp(t) => InnerFieldGetter::Timestamp {
+                pos,
+                precision: t.precision(),
+            },
+            DataType::TimestampLTz(t) => InnerFieldGetter::TimestampLtz {
+                pos,
+                precision: t.precision(),
+            },
             _ => unimplemented!("DataType {:?} is currently unimplemented", 
data_type),
         };
 
@@ -79,17 +94,60 @@ impl FieldGetter {
 
 #[derive(Clone)]
 pub enum InnerFieldGetter {
-    Char { pos: usize, len: usize },
-    String { pos: usize },
-    Bool { pos: usize },
-    Binary { pos: usize, len: usize },
-    Bytes { pos: usize },
-    TinyInt { pos: usize },
-    SmallInt { pos: usize },
-    Int { pos: usize },
-    BigInt { pos: usize },
-    Float { pos: usize },
-    Double { pos: usize },
+    Char {
+        pos: usize,
+        len: usize,
+    },
+    String {
+        pos: usize,
+    },
+    Bool {
+        pos: usize,
+    },
+    Binary {
+        pos: usize,
+        len: usize,
+    },
+    Bytes {
+        pos: usize,
+    },
+    TinyInt {
+        pos: usize,
+    },
+    SmallInt {
+        pos: usize,
+    },
+    Int {
+        pos: usize,
+    },
+    BigInt {
+        pos: usize,
+    },
+    Float {
+        pos: usize,
+    },
+    Double {
+        pos: usize,
+    },
+    Decimal {
+        pos: usize,
+        precision: usize,
+        scale: usize,
+    },
+    Date {
+        pos: usize,
+    },
+    Time {
+        pos: usize,
+    },
+    Timestamp {
+        pos: usize,
+        precision: u32,
+    },
+    TimestampLtz {
+        pos: usize,
+        precision: u32,
+    },
 }
 
 impl InnerFieldGetter {
@@ -106,7 +164,19 @@ impl InnerFieldGetter {
             InnerFieldGetter::BigInt { pos } => 
Datum::from(row.get_long(*pos)),
             InnerFieldGetter::Float { pos } => 
Datum::from(row.get_float(*pos)),
             InnerFieldGetter::Double { pos } => 
Datum::from(row.get_double(*pos)),
-            //TODO Decimal, Date, Time, Timestamp, TimestampLTZ, Array, Map, 
Row
+            InnerFieldGetter::Decimal {
+                pos,
+                precision,
+                scale,
+            } => Datum::Decimal(row.get_decimal(*pos, *precision, *scale)),
+            InnerFieldGetter::Date { pos } => Datum::Date(row.get_date(*pos)),
+            InnerFieldGetter::Time { pos } => Datum::Time(row.get_time(*pos)),
+            InnerFieldGetter::Timestamp { pos, precision } => {
+                Datum::TimestampNtz(row.get_timestamp_ntz(*pos, *precision))
+            }
+            InnerFieldGetter::TimestampLtz { pos, precision } => {
+                Datum::TimestampLtz(row.get_timestamp_ltz(*pos, *precision))
+            } //TODO Array, Map, Row
         }
     }
 
@@ -122,7 +192,12 @@ impl InnerFieldGetter {
             | Self::Int { pos }
             | Self::BigInt { pos }
             | Self::Float { pos, .. }
-            | Self::Double { pos } => *pos,
+            | Self::Double { pos }
+            | Self::Decimal { pos, .. }
+            | Self::Date { pos }
+            | Self::Time { pos }
+            | Self::Timestamp { pos, .. }
+            | Self::TimestampLtz { pos, .. } => *pos,
         }
     }
 }
diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs
index 536409e..d2f640e 100644
--- a/crates/fluss/src/row/mod.rs
+++ b/crates/fluss/src/row/mod.rs
@@ -18,6 +18,7 @@
 mod column;
 
 mod datum;
+mod decimal;
 
 pub mod binary;
 pub mod compacted;
@@ -28,6 +29,7 @@ mod row_decoder;
 pub use column::*;
 pub use compacted::CompactedRow;
 pub use datum::*;
+pub use decimal::{Decimal, MAX_COMPACT_PRECISION};
 pub use encode::KeyEncoder;
 pub use row_decoder::{CompactedRowDecoder, RowDecoder, RowDecoderFactory};
 
@@ -71,14 +73,26 @@ pub trait InternalRow {
     /// Returns the string value at the given position
     fn get_string(&self, pos: usize) -> &str;
 
-    // /// Returns the decimal value at the given position
-    // fn get_decimal(&self, pos: usize, precision: usize, scale: usize) -> 
Decimal;
+    /// Returns the decimal value at the given position
+    fn get_decimal(&self, pos: usize, precision: usize, scale: usize) -> 
Decimal;
 
-    // /// Returns the timestamp value at the given position
-    // fn get_timestamp_ntz(&self, pos: usize, precision: usize) -> 
TimestampNtz;
+    /// Returns the date value at the given position (date as days since epoch)
+    fn get_date(&self, pos: usize) -> datum::Date;
 
-    // /// Returns the timestamp value at the given position
-    // fn get_timestamp_ltz(&self, pos: usize, precision: usize) -> 
TimestampLtz;
+    /// Returns the time value at the given position (time as milliseconds 
since midnight)
+    fn get_time(&self, pos: usize) -> datum::Time;
+
+    /// Returns the timestamp value at the given position (timestamp without 
timezone)
+    ///
+    /// The precision is required to determine whether the timestamp value was 
stored
+    /// in a compact representation (precision <= 3) or with nanosecond 
precision.
+    fn get_timestamp_ntz(&self, pos: usize, precision: u32) -> 
datum::TimestampNtz;
+
+    /// Returns the timestamp value at the given position (timestamp with 
local timezone)
+    ///
+    /// The precision is required to determine whether the timestamp value was 
stored
+    /// in a compact representation (precision <= 3) or with nanosecond 
precision.
+    fn get_timestamp_ltz(&self, pos: usize, precision: u32) -> 
datum::TimestampLtz;
 
     /// Returns the binary value at the given position with fixed length
     fn get_binary(&self, pos: usize, length: usize) -> &[u8];
@@ -123,6 +137,43 @@ impl<'a> InternalRow for GenericRow<'a> {
         self.values.get(_pos).unwrap().try_into().unwrap()
     }
 
+    fn get_decimal(&self, pos: usize, _precision: usize, _scale: usize) -> 
Decimal {
+        match self.values.get(pos).unwrap() {
+            Datum::Decimal(d) => d.clone(),
+            other => panic!("Expected Decimal at pos {pos:?}, got {other:?}"),
+        }
+    }
+
+    fn get_date(&self, pos: usize) -> datum::Date {
+        match self.values.get(pos).unwrap() {
+            Datum::Date(d) => *d,
+            Datum::Int32(i) => datum::Date::new(*i),
+            other => panic!("Expected Date or Int32 at pos {pos:?}, got 
{other:?}"),
+        }
+    }
+
+    fn get_time(&self, pos: usize) -> datum::Time {
+        match self.values.get(pos).unwrap() {
+            Datum::Time(t) => *t,
+            Datum::Int32(i) => datum::Time::new(*i),
+            other => panic!("Expected Time or Int32 at pos {pos:?}, got 
{other:?}"),
+        }
+    }
+
+    fn get_timestamp_ntz(&self, pos: usize, _precision: u32) -> 
datum::TimestampNtz {
+        match self.values.get(pos).unwrap() {
+            Datum::TimestampNtz(t) => *t,
+            other => panic!("Expected TimestampNtz at pos {pos:?}, got 
{other:?}"),
+        }
+    }
+
+    fn get_timestamp_ltz(&self, pos: usize, _precision: u32) -> 
datum::TimestampLtz {
+        match self.values.get(pos).unwrap() {
+            Datum::TimestampLtz(t) => *t,
+            other => panic!("Expected TimestampLtz at pos {pos:?}, got 
{other:?}"),
+        }
+    }
+
     fn get_float(&self, pos: usize) -> f32 {
         self.values.get(pos).unwrap().try_into().unwrap()
     }

Reply via email to