This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 03f0889d feat: Implement Decimal from/to bytes represents (#665)
03f0889d is described below

commit 03f0889d740ccb3c749347a8af6f2e049a438a20
Author: Xuanwo <[email protected]>
AuthorDate: Fri Dec 13 17:44:39 2024 +0800

    feat: Implement Decimal from/to bytes represents (#665)
---
 Cargo.toml                               |   1 +
 crates/iceberg/Cargo.toml                |   1 +
 crates/iceberg/src/spec/manifest.rs      |  22 +++--
 crates/iceberg/src/spec/manifest_list.rs |  31 +++----
 crates/iceberg/src/spec/values.rs        | 146 +++++++++++++++++++++++++++++--
 5 files changed, 168 insertions(+), 33 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 9a76ff41..c766040d 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -71,6 +71,7 @@ itertools = "0.13"
 log = "0.4"
 mockito = "1"
 murmur3 = "0.5.2"
+num-bigint = "0.4.6"
 once_cell = "1"
 opendal = "0.50.1"
 ordered-float = "4"
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index f2e6694b..f84e7ab6 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -63,6 +63,7 @@ futures = { workspace = true }
 itertools = { workspace = true }
 moka = { version = "0.12.8", features = ["future"] }
 murmur3 = { workspace = true }
+num-bigint = { workspace = true }
 once_cell = { workspace = true }
 opendal = { workspace = true }
 ordered-float = { workspace = true }
diff --git a/crates/iceberg/src/spec/manifest.rs 
b/crates/iceberg/src/spec/manifest.rs
index a868c7b1..13ecdc2e 100644
--- a/crates/iceberg/src/spec/manifest.rs
+++ b/crates/iceberg/src/spec/manifest.rs
@@ -1346,8 +1346,8 @@ mod _serde {
                 value_counts: Some(to_i64_entry(value.value_counts)?),
                 null_value_counts: 
Some(to_i64_entry(value.null_value_counts)?),
                 nan_value_counts: Some(to_i64_entry(value.nan_value_counts)?),
-                lower_bounds: Some(to_bytes_entry(value.lower_bounds)),
-                upper_bounds: Some(to_bytes_entry(value.upper_bounds)),
+                lower_bounds: Some(to_bytes_entry(value.lower_bounds)?),
+                upper_bounds: Some(to_bytes_entry(value.upper_bounds)?),
                 key_metadata: 
Some(serde_bytes::ByteBuf::from(value.key_metadata)),
                 split_offsets: Some(value.split_offsets),
                 equality_ids: Some(value.equality_ids),
@@ -1451,13 +1451,17 @@ mod _serde {
         Ok(m)
     }
 
-    fn to_bytes_entry(v: impl IntoIterator<Item = (i32, Datum)>) -> 
Vec<BytesEntry> {
-        v.into_iter()
-            .map(|e| BytesEntry {
-                key: e.0,
-                value: e.1.to_bytes(),
-            })
-            .collect()
+    fn to_bytes_entry(v: impl IntoIterator<Item = (i32, Datum)>) -> 
Result<Vec<BytesEntry>, Error> {
+        let iter = v.into_iter();
+        // Reserve the capacity to the lower bound.
+        let mut bs = Vec::with_capacity(iter.size_hint().0);
+        for (k, d) in iter {
+            bs.push(BytesEntry {
+                key: k,
+                value: d.to_bytes()?,
+            });
+        }
+        Ok(bs)
     }
 
     #[derive(Serialize, Deserialize)]
diff --git a/crates/iceberg/src/spec/manifest_list.rs 
b/crates/iceberg/src/spec/manifest_list.rs
index 5768b79d..6c898f89 100644
--- a/crates/iceberg/src/spec/manifest_list.rs
+++ b/crates/iceberg/src/spec/manifest_list.rs
@@ -973,21 +973,22 @@ pub(super) mod _serde {
 
     fn convert_to_serde_field_summary(
         partitions: Vec<super::FieldSummary>,
-    ) -> Option<Vec<FieldSummary>> {
+    ) -> Result<Option<Vec<FieldSummary>>> {
         if partitions.is_empty() {
-            None
+            Ok(None)
         } else {
-            Some(
-                partitions
-                    .into_iter()
-                    .map(|v| FieldSummary {
-                        contains_null: v.contains_null,
-                        contains_nan: v.contains_nan,
-                        lower_bound: v.lower_bound.map(|v| v.to_bytes()),
-                        upper_bound: v.upper_bound.map(|v| v.to_bytes()),
-                    })
-                    .collect(),
-            )
+            let mut vs = Vec::with_capacity(partitions.len());
+
+            for v in partitions {
+                let fs = FieldSummary {
+                    contains_null: v.contains_null,
+                    contains_nan: v.contains_nan,
+                    lower_bound: v.lower_bound.map(|v| 
v.to_bytes()).transpose()?,
+                    upper_bound: v.upper_bound.map(|v| 
v.to_bytes()).transpose()?,
+                };
+                vs.push(fs);
+            }
+            Ok(Some(vs))
         }
     }
 
@@ -1003,7 +1004,7 @@ pub(super) mod _serde {
         type Error = Error;
 
         fn try_from(value: ManifestFile) -> std::result::Result<Self, 
Self::Error> {
-            let partitions = convert_to_serde_field_summary(value.partitions);
+            let partitions = convert_to_serde_field_summary(value.partitions)?;
             let key_metadata = 
convert_to_serde_key_metadata(value.key_metadata);
             Ok(Self {
                 manifest_path: value.manifest_path,
@@ -1077,7 +1078,7 @@ pub(super) mod _serde {
         type Error = Error;
 
         fn try_from(value: ManifestFile) -> std::result::Result<Self, 
Self::Error> {
-            let partitions = convert_to_serde_field_summary(value.partitions);
+            let partitions = convert_to_serde_field_summary(value.partitions)?;
             let key_metadata = 
convert_to_serde_key_metadata(value.key_metadata);
             Ok(Self {
                 manifest_path: value.manifest_path,
diff --git a/crates/iceberg/src/spec/values.rs 
b/crates/iceberg/src/spec/values.rs
index 06acdc9e..38326b80 100644
--- a/crates/iceberg/src/spec/values.rs
+++ b/crates/iceberg/src/spec/values.rs
@@ -29,7 +29,9 @@ use std::str::FromStr;
 pub use _serde::RawLiteral;
 use bitvec::vec::BitVec;
 use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc};
+use num_bigint::BigInt;
 use ordered_float::OrderedFloat;
+use rust_decimal::prelude::ToPrimitive;
 use rust_decimal::Decimal;
 use serde::de::{
     MapAccess, {self},
@@ -422,10 +424,15 @@ impl Datum {
             }
             PrimitiveType::Fixed(_) => 
PrimitiveLiteral::Binary(Vec::from(bytes)),
             PrimitiveType::Binary => 
PrimitiveLiteral::Binary(Vec::from(bytes)),
-            PrimitiveType::Decimal {
-                precision: _,
-                scale: _,
-            } => todo!(),
+            PrimitiveType::Decimal { .. } => {
+                let unscaled_value = BigInt::from_signed_bytes_be(bytes);
+                
PrimitiveLiteral::Int128(unscaled_value.to_i128().ok_or_else(|| {
+                    Error::new(
+                        ErrorKind::DataInvalid,
+                        format!("Can't convert bytes to i128: {:?}", bytes),
+                    )
+                })?)
+            }
         };
         Ok(Datum::new(data_type, literal))
     }
@@ -433,8 +440,8 @@ impl Datum {
     /// Convert the value to bytes
     ///
     /// See [this 
spec](https://iceberg.apache.org/spec/#binary-single-value-serialization) for 
reference.
-    pub fn to_bytes(&self) -> ByteBuf {
-        match &self.literal {
+    pub fn to_bytes(&self) -> Result<ByteBuf> {
+        let buf = match &self.literal {
             PrimitiveLiteral::Boolean(val) => {
                 if *val {
                     ByteBuf::from([1u8])
@@ -449,8 +456,42 @@ impl Datum {
             PrimitiveLiteral::String(val) => ByteBuf::from(val.as_bytes()),
             PrimitiveLiteral::UInt128(val) => ByteBuf::from(val.to_be_bytes()),
             PrimitiveLiteral::Binary(val) => ByteBuf::from(val.as_slice()),
-            PrimitiveLiteral::Int128(_) => todo!(),
-        }
+            PrimitiveLiteral::Int128(val) => {
+                let PrimitiveType::Decimal { precision, .. } = self.r#type 
else {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!(
+                            "PrimitiveLiteral Int128 must be PrimitiveType 
Decimal but got {}",
+                            &self.r#type
+                        ),
+                    ));
+                };
+
+                // It's required by iceberg spec that we must keep the minimum
+                // number of bytes for the value
+                let Ok(required_bytes) = 
Type::decimal_required_bytes(precision) else {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!(
+                            "PrimitiveType Decimal must has valid precision 
but got {}",
+                            precision
+                        ),
+                    ));
+                };
+
+                // The primitive literal is unscaled value.
+                let unscaled_value = BigInt::from(*val);
+                // Convert into two's-complement byte representation of the 
BigInt
+                // in big-endian byte order.
+                let mut bytes = unscaled_value.to_signed_bytes_be();
+                // Truncate with required bytes to make sure.
+                bytes.truncate(required_bytes as usize);
+
+                ByteBuf::from(bytes)
+            }
+        };
+
+        Ok(buf)
     }
 
     /// Creates a boolean value.
@@ -1012,6 +1053,46 @@ impl Datum {
         }
     }
 
+    /// Try to create a decimal literal from [`Decimal`] with precision.
+    ///
+    /// Example:
+    ///
+    /// ```rust
+    /// use iceberg::spec::Datum;
+    /// use rust_decimal::Decimal;
+    ///
+    /// let t = Datum::decimal_with_precision(Decimal::new(123, 2), 
30).unwrap();
+    ///
+    /// assert_eq!(&format!("{t}"), "1.23");
+    /// ```
+    pub fn decimal_with_precision(value: impl Into<Decimal>, precision: u32) 
-> Result<Self> {
+        let decimal = value.into();
+        let scale = decimal.scale();
+
+        let available_bytes = Type::decimal_required_bytes(precision)? as 
usize;
+        let unscaled_value = BigInt::from(decimal.mantissa());
+        let actual_bytes = unscaled_value.to_signed_bytes_be();
+        if actual_bytes.len() > available_bytes {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Decimal value {} is too large for precision {}",
+                    decimal, precision
+                ),
+            ));
+        }
+
+        let r#type = Type::decimal(precision, scale)?;
+        if let Type::Primitive(p) = r#type {
+            Ok(Self {
+                r#type: p,
+                literal: PrimitiveLiteral::Int128(decimal.mantissa()),
+            })
+        } else {
+            unreachable!("Decimal type must be primitive.")
+        }
+    }
+
     /// Convert the datum to `target_type`.
     pub fn to(self, target_type: &Type) -> Result<Datum> {
         match target_type {
@@ -2729,7 +2810,7 @@ mod tests {
         assert_eq!(datum, expected_datum);
 
         let mut writer = apache_avro::Writer::new(&schema, Vec::new());
-        writer.append_ser(datum.to_bytes()).unwrap();
+        writer.append_ser(datum.to_bytes().unwrap()).unwrap();
         let encoded = writer.into_inner().unwrap();
         let reader = apache_avro::Reader::with_schema(&schema, 
&*encoded).unwrap();
 
@@ -3045,6 +3126,53 @@ mod tests {
         check_avro_bytes_serde(bytes, Datum::string("iceberg"), 
&PrimitiveType::String);
     }
 
+    #[test]
+    fn avro_bytes_decimal() {
+        // (input_bytes, decimal_num, expect_scale, expect_precision)
+        let cases = vec![
+            (vec![4u8, 210u8], 1234, 2, 38),
+            (vec![251u8, 46u8], -1234, 2, 38),
+            (vec![4u8, 210u8], 1234, 3, 38),
+            (vec![251u8, 46u8], -1234, 3, 38),
+            (vec![42u8], 42, 2, 1),
+            (vec![214u8], -42, 2, 1),
+        ];
+
+        for (input_bytes, decimal_num, expect_scale, expect_precision) in 
cases {
+            check_avro_bytes_serde(
+                input_bytes,
+                Datum::decimal_with_precision(
+                    Decimal::new(decimal_num, expect_scale),
+                    expect_precision,
+                )
+                .unwrap(),
+                &PrimitiveType::Decimal {
+                    precision: expect_precision,
+                    scale: expect_scale,
+                },
+            );
+        }
+    }
+
+    #[test]
+    fn avro_bytes_decimal_expect_error() {
+        // (decimal_num, expect_scale, expect_precision)
+        let cases = vec![(1234, 2, 1)];
+
+        for (decimal_num, expect_scale, expect_precision) in cases {
+            let result = Datum::decimal_with_precision(
+                Decimal::new(decimal_num, expect_scale),
+                expect_precision,
+            );
+            assert!(result.is_err(), "expect error but got {:?}", result);
+            assert_eq!(
+                result.unwrap_err().kind(),
+                ErrorKind::DataInvalid,
+                "expect error DataInvalid",
+            );
+        }
+    }
+
     #[test]
     fn avro_convert_test_int() {
         check_convert_with_avro(

Reply via email to