This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 03f0889d feat: Implement Decimal from/to bytes represents (#665)
03f0889d is described below
commit 03f0889d740ccb3c749347a8af6f2e049a438a20
Author: Xuanwo <[email protected]>
AuthorDate: Fri Dec 13 17:44:39 2024 +0800
feat: Implement Decimal from/to bytes represents (#665)
---
Cargo.toml | 1 +
crates/iceberg/Cargo.toml | 1 +
crates/iceberg/src/spec/manifest.rs | 22 +++--
crates/iceberg/src/spec/manifest_list.rs | 31 +++----
crates/iceberg/src/spec/values.rs | 146 +++++++++++++++++++++++++++++--
5 files changed, 168 insertions(+), 33 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 9a76ff41..c766040d 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -71,6 +71,7 @@ itertools = "0.13"
log = "0.4"
mockito = "1"
murmur3 = "0.5.2"
+num-bigint = "0.4.6"
once_cell = "1"
opendal = "0.50.1"
ordered-float = "4"
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index f2e6694b..f84e7ab6 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -63,6 +63,7 @@ futures = { workspace = true }
itertools = { workspace = true }
moka = { version = "0.12.8", features = ["future"] }
murmur3 = { workspace = true }
+num-bigint = { workspace = true }
once_cell = { workspace = true }
opendal = { workspace = true }
ordered-float = { workspace = true }
diff --git a/crates/iceberg/src/spec/manifest.rs
b/crates/iceberg/src/spec/manifest.rs
index a868c7b1..13ecdc2e 100644
--- a/crates/iceberg/src/spec/manifest.rs
+++ b/crates/iceberg/src/spec/manifest.rs
@@ -1346,8 +1346,8 @@ mod _serde {
value_counts: Some(to_i64_entry(value.value_counts)?),
null_value_counts:
Some(to_i64_entry(value.null_value_counts)?),
nan_value_counts: Some(to_i64_entry(value.nan_value_counts)?),
- lower_bounds: Some(to_bytes_entry(value.lower_bounds)),
- upper_bounds: Some(to_bytes_entry(value.upper_bounds)),
+ lower_bounds: Some(to_bytes_entry(value.lower_bounds)?),
+ upper_bounds: Some(to_bytes_entry(value.upper_bounds)?),
key_metadata:
Some(serde_bytes::ByteBuf::from(value.key_metadata)),
split_offsets: Some(value.split_offsets),
equality_ids: Some(value.equality_ids),
@@ -1451,13 +1451,17 @@ mod _serde {
Ok(m)
}
- fn to_bytes_entry(v: impl IntoIterator<Item = (i32, Datum)>) ->
Vec<BytesEntry> {
- v.into_iter()
- .map(|e| BytesEntry {
- key: e.0,
- value: e.1.to_bytes(),
- })
- .collect()
+ fn to_bytes_entry(v: impl IntoIterator<Item = (i32, Datum)>) ->
Result<Vec<BytesEntry>, Error> {
+ let iter = v.into_iter();
+ // Reserve the capacity to the lower bound.
+ let mut bs = Vec::with_capacity(iter.size_hint().0);
+ for (k, d) in iter {
+ bs.push(BytesEntry {
+ key: k,
+ value: d.to_bytes()?,
+ });
+ }
+ Ok(bs)
}
#[derive(Serialize, Deserialize)]
diff --git a/crates/iceberg/src/spec/manifest_list.rs
b/crates/iceberg/src/spec/manifest_list.rs
index 5768b79d..6c898f89 100644
--- a/crates/iceberg/src/spec/manifest_list.rs
+++ b/crates/iceberg/src/spec/manifest_list.rs
@@ -973,21 +973,22 @@ pub(super) mod _serde {
fn convert_to_serde_field_summary(
partitions: Vec<super::FieldSummary>,
- ) -> Option<Vec<FieldSummary>> {
+ ) -> Result<Option<Vec<FieldSummary>>> {
if partitions.is_empty() {
- None
+ Ok(None)
} else {
- Some(
- partitions
- .into_iter()
- .map(|v| FieldSummary {
- contains_null: v.contains_null,
- contains_nan: v.contains_nan,
- lower_bound: v.lower_bound.map(|v| v.to_bytes()),
- upper_bound: v.upper_bound.map(|v| v.to_bytes()),
- })
- .collect(),
- )
+ let mut vs = Vec::with_capacity(partitions.len());
+
+ for v in partitions {
+ let fs = FieldSummary {
+ contains_null: v.contains_null,
+ contains_nan: v.contains_nan,
+ lower_bound: v.lower_bound.map(|v|
v.to_bytes()).transpose()?,
+ upper_bound: v.upper_bound.map(|v|
v.to_bytes()).transpose()?,
+ };
+ vs.push(fs);
+ }
+ Ok(Some(vs))
}
}
@@ -1003,7 +1004,7 @@ pub(super) mod _serde {
type Error = Error;
fn try_from(value: ManifestFile) -> std::result::Result<Self,
Self::Error> {
- let partitions = convert_to_serde_field_summary(value.partitions);
+ let partitions = convert_to_serde_field_summary(value.partitions)?;
let key_metadata =
convert_to_serde_key_metadata(value.key_metadata);
Ok(Self {
manifest_path: value.manifest_path,
@@ -1077,7 +1078,7 @@ pub(super) mod _serde {
type Error = Error;
fn try_from(value: ManifestFile) -> std::result::Result<Self,
Self::Error> {
- let partitions = convert_to_serde_field_summary(value.partitions);
+ let partitions = convert_to_serde_field_summary(value.partitions)?;
let key_metadata =
convert_to_serde_key_metadata(value.key_metadata);
Ok(Self {
manifest_path: value.manifest_path,
diff --git a/crates/iceberg/src/spec/values.rs
b/crates/iceberg/src/spec/values.rs
index 06acdc9e..38326b80 100644
--- a/crates/iceberg/src/spec/values.rs
+++ b/crates/iceberg/src/spec/values.rs
@@ -29,7 +29,9 @@ use std::str::FromStr;
pub use _serde::RawLiteral;
use bitvec::vec::BitVec;
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc};
+use num_bigint::BigInt;
use ordered_float::OrderedFloat;
+use rust_decimal::prelude::ToPrimitive;
use rust_decimal::Decimal;
use serde::de::{
MapAccess, {self},
@@ -422,10 +424,15 @@ impl Datum {
}
PrimitiveType::Fixed(_) =>
PrimitiveLiteral::Binary(Vec::from(bytes)),
PrimitiveType::Binary =>
PrimitiveLiteral::Binary(Vec::from(bytes)),
- PrimitiveType::Decimal {
- precision: _,
- scale: _,
- } => todo!(),
+ PrimitiveType::Decimal { .. } => {
+ let unscaled_value = BigInt::from_signed_bytes_be(bytes);
+
PrimitiveLiteral::Int128(unscaled_value.to_i128().ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!("Can't convert bytes to i128: {:?}", bytes),
+ )
+ })?)
+ }
};
Ok(Datum::new(data_type, literal))
}
@@ -433,8 +440,8 @@ impl Datum {
/// Convert the value to bytes
///
/// See [this
spec](https://iceberg.apache.org/spec/#binary-single-value-serialization) for
reference.
- pub fn to_bytes(&self) -> ByteBuf {
- match &self.literal {
+ pub fn to_bytes(&self) -> Result<ByteBuf> {
+ let buf = match &self.literal {
PrimitiveLiteral::Boolean(val) => {
if *val {
ByteBuf::from([1u8])
@@ -449,8 +456,42 @@ impl Datum {
PrimitiveLiteral::String(val) => ByteBuf::from(val.as_bytes()),
PrimitiveLiteral::UInt128(val) => ByteBuf::from(val.to_be_bytes()),
PrimitiveLiteral::Binary(val) => ByteBuf::from(val.as_slice()),
- PrimitiveLiteral::Int128(_) => todo!(),
- }
+ PrimitiveLiteral::Int128(val) => {
+ let PrimitiveType::Decimal { precision, .. } = self.r#type
else {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "PrimitiveLiteral Int128 must be PrimitiveType
Decimal but got {}",
+ &self.r#type
+ ),
+ ));
+ };
+
+ // It's required by iceberg spec that we must keep the minimum
+ // number of bytes for the value
+ let Ok(required_bytes) =
Type::decimal_required_bytes(precision) else {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "PrimitiveType Decimal must has valid precision
but got {}",
+ precision
+ ),
+ ));
+ };
+
+ // The primitive literal is unscaled value.
+ let unscaled_value = BigInt::from(*val);
+ // Convert into two's-complement byte representation of the
BigInt
+ // in big-endian byte order.
+ let mut bytes = unscaled_value.to_signed_bytes_be();
+ // Truncate with required bytes to make sure.
+ bytes.truncate(required_bytes as usize);
+
+ ByteBuf::from(bytes)
+ }
+ };
+
+ Ok(buf)
}
/// Creates a boolean value.
@@ -1012,6 +1053,46 @@ impl Datum {
}
}
+ /// Try to create a decimal literal from [`Decimal`] with precision.
+ ///
+ /// Example:
+ ///
+ /// ```rust
+ /// use iceberg::spec::Datum;
+ /// use rust_decimal::Decimal;
+ ///
+ /// let t = Datum::decimal_with_precision(Decimal::new(123, 2),
30).unwrap();
+ ///
+ /// assert_eq!(&format!("{t}"), "1.23");
+ /// ```
+ pub fn decimal_with_precision(value: impl Into<Decimal>, precision: u32)
-> Result<Self> {
+ let decimal = value.into();
+ let scale = decimal.scale();
+
+ let available_bytes = Type::decimal_required_bytes(precision)? as
usize;
+ let unscaled_value = BigInt::from(decimal.mantissa());
+ let actual_bytes = unscaled_value.to_signed_bytes_be();
+ if actual_bytes.len() > available_bytes {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Decimal value {} is too large for precision {}",
+ decimal, precision
+ ),
+ ));
+ }
+
+ let r#type = Type::decimal(precision, scale)?;
+ if let Type::Primitive(p) = r#type {
+ Ok(Self {
+ r#type: p,
+ literal: PrimitiveLiteral::Int128(decimal.mantissa()),
+ })
+ } else {
+ unreachable!("Decimal type must be primitive.")
+ }
+ }
+
/// Convert the datum to `target_type`.
pub fn to(self, target_type: &Type) -> Result<Datum> {
match target_type {
@@ -2729,7 +2810,7 @@ mod tests {
assert_eq!(datum, expected_datum);
let mut writer = apache_avro::Writer::new(&schema, Vec::new());
- writer.append_ser(datum.to_bytes()).unwrap();
+ writer.append_ser(datum.to_bytes().unwrap()).unwrap();
let encoded = writer.into_inner().unwrap();
let reader = apache_avro::Reader::with_schema(&schema,
&*encoded).unwrap();
@@ -3045,6 +3126,53 @@ mod tests {
check_avro_bytes_serde(bytes, Datum::string("iceberg"),
&PrimitiveType::String);
}
+ #[test]
+ fn avro_bytes_decimal() {
+ // (input_bytes, decimal_num, expect_scale, expect_precision)
+ let cases = vec![
+ (vec![4u8, 210u8], 1234, 2, 38),
+ (vec![251u8, 46u8], -1234, 2, 38),
+ (vec![4u8, 210u8], 1234, 3, 38),
+ (vec![251u8, 46u8], -1234, 3, 38),
+ (vec![42u8], 42, 2, 1),
+ (vec![214u8], -42, 2, 1),
+ ];
+
+ for (input_bytes, decimal_num, expect_scale, expect_precision) in
cases {
+ check_avro_bytes_serde(
+ input_bytes,
+ Datum::decimal_with_precision(
+ Decimal::new(decimal_num, expect_scale),
+ expect_precision,
+ )
+ .unwrap(),
+ &PrimitiveType::Decimal {
+ precision: expect_precision,
+ scale: expect_scale,
+ },
+ );
+ }
+ }
+
+ #[test]
+ fn avro_bytes_decimal_expect_error() {
+ // (decimal_num, expect_scale, expect_precision)
+ let cases = vec![(1234, 2, 1)];
+
+ for (decimal_num, expect_scale, expect_precision) in cases {
+ let result = Datum::decimal_with_precision(
+ Decimal::new(decimal_num, expect_scale),
+ expect_precision,
+ );
+ assert!(result.is_err(), "expect error but got {:?}", result);
+ assert_eq!(
+ result.unwrap_err().kind(),
+ ErrorKind::DataInvalid,
+ "expect error DataInvalid",
+ );
+ }
+ }
+
#[test]
fn avro_convert_test_int() {
check_convert_with_avro(