This is an automated email from the ASF dual-hosted git repository.
clesaec pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/branch-1.11 by this push:
new 3223fad8a AVRO-3779: bigdecimal rust 1 11 (#2586)
3223fad8a is described below
commit 3223fad8a6d1949e3d5545027a8e6a99b342fa3c
Author: Christophe Le Saec <[email protected]>
AuthorDate: Mon Nov 20 14:55:01 2023 +0100
AVRO-3779: bigdecimal rust 1 11 (#2586)
* AVRO-3779: big decimal for rust in 1.11
---
.../en/docs/++version++/Specification/_index.md | 5 +-
lang/rust/Cargo.lock | 14 ++++
lang/rust/avro/Cargo.toml | 1 +
lang/rust/avro/src/decimal.rs | 85 +++++++++++++++++++++-
lang/rust/avro/src/decode.rs | 12 ++-
lang/rust/avro/src/encode.rs | 9 ++-
lang/rust/avro/src/error.rs | 12 +++
lang/rust/avro/src/schema.rs | 43 +++++++++++
lang/rust/avro/src/types.rs | 35 ++++++++-
9 files changed, 205 insertions(+), 11 deletions(-)
diff --git a/doc/content/en/docs/++version++/Specification/_index.md
b/doc/content/en/docs/++version++/Specification/_index.md
index 5141d9eae..21760e7a6 100755
--- a/doc/content/en/docs/++version++/Specification/_index.md
+++ b/doc/content/en/docs/++version++/Specification/_index.md
@@ -794,11 +794,11 @@ Scale must be zero or a positive integer less than or
equal to the precision.
For the purposes of schema resolution, two schemas that are `decimal` logical
types _match_ if their scales and precisions match.
-**alternative**
As it's not always possible to fix scale and precision in advance for a
decimal field, `big-decimal` is another `decimal` logical type restrict to Avro
_bytes_.
-_only available in Java_
+_Currently only available in Java and Rust_.
+
```json
{
@@ -808,6 +808,7 @@ _only available in Java_
```
Here, as scale property is stored in value itself it needs more bytes than
preceding `decimal` type, but it allows more flexibility.
+
### UUID
The `uuid` logical type represents a random generated universally unique
identifier (UUID).
diff --git a/lang/rust/Cargo.lock b/lang/rust/Cargo.lock
index 25bf160ca..b39cb9a76 100644
--- a/lang/rust/Cargo.lock
+++ b/lang/rust/Cargo.lock
@@ -68,6 +68,7 @@ dependencies = [
"anyhow",
"apache-avro-derive",
"apache-avro-test-helper",
+ "bigdecimal",
"bzip2",
"crc32fast",
"criterion",
@@ -155,6 +156,19 @@ dependencies = [
"rustc-demangle",
]
+[[package]]
+name = "bigdecimal"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "454bca3db10617b88b566f205ed190aedb0e0e6dd4cad61d3988a72e8c5594cb"
+dependencies = [
+ "autocfg",
+ "libm",
+ "num-bigint",
+ "num-integer",
+ "num-traits",
+]
+
[[package]]
name = "bitflags"
version = "1.3.2"
diff --git a/lang/rust/avro/Cargo.toml b/lang/rust/avro/Cargo.toml
index 163bbfe00..b9610a8b5 100644
--- a/lang/rust/avro/Cargo.toml
+++ b/lang/rust/avro/Cargo.toml
@@ -73,6 +73,7 @@ typed-builder = { default-features = false, version =
"0.16.2" }
uuid = { default-features = false, version = "1.4.1", features = ["serde",
"std"] }
xz2 = { default-features = false, version = "0.1.7", optional = true }
zstd = { default-features = false, version = "0.12.4+zstd.1.5.2", optional =
true }
+bigdecimal = "0.4"
[target.'cfg(target_arch = "wasm32")'.dependencies]
quad-rand = { default-features = false, version = "0.2.1" }
diff --git a/lang/rust/avro/src/decimal.rs b/lang/rust/avro/src/decimal.rs
index 0139e3719..5a88f8670 100644
--- a/lang/rust/avro/src/decimal.rs
+++ b/lang/rust/avro/src/decimal.rs
@@ -15,8 +15,15 @@
// specific language governing permissions and limitations
// under the License.
-use crate::{AvroResult, Error};
+use crate::{
+ decode::{decode_len, decode_long},
+ encode::{encode_bytes, encode_long},
+ types::Value,
+ AvroResult, Error,
+};
+use bigdecimal::BigDecimal;
use num_bigint::{BigInt, Sign};
+use std::io::Read;
#[derive(Debug, Clone)]
pub struct Decimal {
@@ -105,12 +112,47 @@ impl<T: AsRef<[u8]>> From<T> for Decimal {
}
}
+pub(crate) fn serialize_big_decimal(decimal: &BigDecimal) -> Vec<u8> {
+ let mut buffer: Vec<u8> = Vec::new();
+ let (big_int, exponent): (BigInt, i64) = decimal.as_bigint_and_exponent();
+ let big_endian_value: Vec<u8> = big_int.to_signed_bytes_be();
+ encode_bytes(&big_endian_value, &mut buffer);
+ encode_long(exponent, &mut buffer);
+
+ buffer
+}
+
+pub(crate) fn deserialize_big_decimal(bytes: &Vec<u8>) -> Result<BigDecimal,
Error> {
+ let mut bytes: &[u8] = bytes.as_slice();
+ let mut big_decimal_buffer = match decode_len(&mut bytes) {
+ Ok(size) => vec![0u8; size],
+ Err(_err) => return Err(Error::BigDecimalLen),
+ };
+
+ bytes
+ .read_exact(&mut big_decimal_buffer[..])
+ .map_err(Error::ReadDouble)?;
+
+ match decode_long(&mut bytes) {
+ Ok(Value::Long(scale_value)) => {
+ let big_int: BigInt =
BigInt::from_signed_bytes_be(&big_decimal_buffer);
+ let decimal = BigDecimal::new(big_int, scale_value);
+ Ok(decimal)
+ }
+ _ => Err(Error::BigDecimalScale),
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
use apache_avro_test_helper::TestResult;
+ use bigdecimal::{One, Zero};
use pretty_assertions::assert_eq;
- use std::convert::TryFrom;
+ use std::{
+ convert::TryFrom,
+ ops::{Div, Mul},
+ };
#[test]
fn test_decimal_from_bytes_from_ref_decimal() -> TestResult {
@@ -133,4 +175,43 @@ mod tests {
Ok(())
}
+
+ #[test]
+ fn test_avro_3779_bigdecimal_serial() -> TestResult {
+ let value: bigdecimal::BigDecimal =
+
bigdecimal::BigDecimal::from(-1421).div(bigdecimal::BigDecimal::from(2));
+ let mut current: bigdecimal::BigDecimal =
bigdecimal::BigDecimal::one();
+
+ for iter in 1..180 {
+ let result: Vec<u8> = serialize_big_decimal(¤t);
+
+ let deserialize_big_decimal: Result<bigdecimal::BigDecimal, Error>
=
+ deserialize_big_decimal(&result);
+ assert!(
+ deserialize_big_decimal.is_ok(),
+ "can't deserialize for iter {iter}"
+ );
+ assert_eq!(
+ current,
+ deserialize_big_decimal.unwrap(),
+ "not equals for ${iter}"
+ );
+ current = current.mul(&value);
+ }
+
+ let result: Vec<u8> = serialize_big_decimal(&BigDecimal::zero());
+ let deserialize_big_decimal: Result<bigdecimal::BigDecimal, Error> =
+ deserialize_big_decimal(&result);
+ assert!(
+ deserialize_big_decimal.is_ok(),
+ "can't deserialize for zero"
+ );
+ assert_eq!(
+ BigDecimal::zero(),
+ deserialize_big_decimal.unwrap(),
+ "not equals for zero"
+ );
+
+ Ok(())
+ }
}
diff --git a/lang/rust/avro/src/decode.rs b/lang/rust/avro/src/decode.rs
index b13c76739..7857bbec5 100644
--- a/lang/rust/avro/src/decode.rs
+++ b/lang/rust/avro/src/decode.rs
@@ -16,7 +16,7 @@
// under the License.
use crate::{
- decimal::Decimal,
+ decimal::{deserialize_big_decimal, Decimal},
duration::Duration,
schema::{
DecimalSchema, EnumSchema, FixedSchema, Name, Namespace, RecordSchema,
ResolvedSchema,
@@ -36,7 +36,7 @@ use std::{
use uuid::Uuid;
#[inline]
-fn decode_long<R: Read>(reader: &mut R) -> AvroResult<Value> {
+pub(crate) fn decode_long<R: Read>(reader: &mut R) -> AvroResult<Value> {
zag_i64(reader).map(Value::Long)
}
@@ -46,7 +46,7 @@ fn decode_int<R: Read>(reader: &mut R) -> AvroResult<Value> {
}
#[inline]
-fn decode_len<R: Read>(reader: &mut R) -> AvroResult<usize> {
+pub(crate) fn decode_len<R: Read>(reader: &mut R) -> AvroResult<usize> {
let len = zag_i64(reader)?;
safe_len(usize::try_from(len).map_err(|e| Error::ConvertI64ToUsize(e,
len))?)
}
@@ -114,6 +114,12 @@ pub(crate) fn decode_internal<R: Read, S: Borrow<Schema>>(
},
schema => Err(Error::ResolveDecimalSchema(schema.into())),
},
+ Schema::BigDecimal => {
+ match decode_internal(&Schema::Bytes, names, enclosing_namespace,
reader)? {
+ Value::Bytes(bytes) =>
deserialize_big_decimal(&bytes).map(Value::BigDecimal),
+ value => Err(Error::BytesValue(value.into())),
+ }
+ }
Schema::Uuid => Ok(Value::Uuid(
Uuid::from_str(
match decode_internal(&Schema::String, names,
enclosing_namespace, reader)? {
diff --git a/lang/rust/avro/src/encode.rs b/lang/rust/avro/src/encode.rs
index 5d9a66ce3..1f7e40da9 100644
--- a/lang/rust/avro/src/encode.rs
+++ b/lang/rust/avro/src/encode.rs
@@ -16,6 +16,7 @@
// under the License.
use crate::{
+ decimal::serialize_big_decimal,
schema::{
DecimalSchema, EnumSchema, FixedSchema, Name, Namespace, RecordSchema,
ResolvedSchema,
Schema, SchemaKind, UnionSchema,
@@ -40,13 +41,13 @@ pub fn encode(value: &Value, schema: &Schema, buffer: &mut
Vec<u8>) -> AvroResul
encode_internal(value, schema, rs.get_names(), &None, buffer)
}
-fn encode_bytes<B: AsRef<[u8]> + ?Sized>(s: &B, buffer: &mut Vec<u8>) {
+pub(crate) fn encode_bytes<B: AsRef<[u8]> + ?Sized>(s: &B, buffer: &mut
Vec<u8>) {
let bytes = s.as_ref();
encode_long(bytes.len() as i64, buffer);
buffer.extend_from_slice(bytes);
}
-fn encode_long(i: i64, buffer: &mut Vec<u8>) {
+pub(crate) fn encode_long(i: i64, buffer: &mut Vec<u8>) {
zig_i64(i, buffer)
}
@@ -128,6 +129,10 @@ pub(crate) fn encode_internal<S: Borrow<Schema>>(
&uuid.to_string(),
buffer,
),
+ Value::BigDecimal(bg) => {
+ let mut buf: Vec<u8> = serialize_big_decimal(bg);
+ buffer.append(&mut buf);
+ }
Value::Bytes(bytes) => match *schema {
Schema::Bytes => encode_bytes(bytes, buffer),
Schema::Fixed { .. } => buffer.extend(bytes),
diff --git a/lang/rust/avro/src/error.rs b/lang/rust/avro/src/error.rs
index c1d1173bd..30a192069 100644
--- a/lang/rust/avro/src/error.rs
+++ b/lang/rust/avro/src/error.rs
@@ -115,6 +115,9 @@ pub enum Error {
#[error("expected UUID, got: {0:?}")]
GetUuid(ValueKind),
+ #[error("expected BigDecimal, got: {0:?}")]
+ GetBigdecimal(ValueKind),
+
#[error("Fixed bytes of size 12 expected, got Fixed of size {0}")]
GetDecimalFixedBytes(usize),
@@ -289,6 +292,15 @@ pub enum Error {
#[error("The decimal precision ({precision}) must be a positive number")]
DecimalPrecisionMuBePositive { precision: usize },
+ #[error("Unreadable decimal sign")]
+ BigDecimalSign,
+
+ #[error("Unreadable length for decimal inner bytes")]
+ BigDecimalLen,
+
+ #[error("Unreadable decimal scale")]
+ BigDecimalScale,
+
#[error("Unexpected `type` {0} variant for `logicalType`")]
GetLogicalTypeVariant(serde_json::Value),
diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs
index 07f57e8cb..673eb806c 100644
--- a/lang/rust/avro/src/schema.rs
+++ b/lang/rust/avro/src/schema.rs
@@ -112,6 +112,9 @@ pub enum Schema {
/// Logical type which represents `Decimal` values. The underlying type is
serialized and
/// deserialized as `Schema::Bytes` or `Schema::Fixed`.
Decimal(DecimalSchema),
+ /// Logical type which represents `Decimal` values without predefined
scale.
+ /// The underlying type is serialized and deserialized as `Schema::Bytes`
+ BigDecimal,
/// A universally unique identifier, annotating a string.
Uuid,
/// Logical type which represents the number of days since the unix epoch.
@@ -189,6 +192,7 @@ impl From<&types::Value> for SchemaKind {
Value::Enum(_, _) => Self::Enum,
Value::Fixed(_, _) => Self::Fixed,
Value::Decimal { .. } => Self::Decimal,
+ Value::BigDecimal(_) => Self::BigDecimal,
Value::Uuid(_) => Self::Uuid,
Value::Date(_) => Self::Date,
Value::TimeMillis(_) => Self::TimeMillis,
@@ -1334,6 +1338,14 @@ impl Parser {
},
);
}
+ "big-decimal" => {
+ return try_convert_to_logical_type(
+ "big-decimal",
+ parse_as_native_complex(complex, self,
enclosing_namespace)?,
+ &[SchemaKind::Bytes],
+ |_| -> AvroResult<Schema> { Ok(Schema::BigDecimal) },
+ );
+ }
"uuid" => {
return try_convert_to_logical_type(
"uuid",
@@ -1888,6 +1900,12 @@ impl Serialize for Schema {
map.serialize_entry("precision", precision)?;
map.end()
}
+ Schema::BigDecimal => {
+ let mut map = serializer.serialize_map(None)?;
+ map.serialize_entry("type", "bytes")?;
+ map.serialize_entry("logicalType", "big-decimal")?;
+ map.end()
+ }
Schema::Uuid => {
let mut map = serializer.serialize_map(None)?;
map.serialize_entry("type", "string")?;
@@ -5096,6 +5114,31 @@ mod tests {
Ok(())
}
+ #[test]
+ fn test_avro_3779_bigdecimal_schema() -> TestResult {
+ let schema = json!(
+ {
+ "type": "record",
+ "name": "recordWithDecimal",
+ "fields": [
+ {
+ "name": "decimal",
+ "type": "bytes",
+ "logicalType": "big-decimal"
+ }
+ ]
+ });
+
+ let parse_result = Schema::parse(&schema);
+ assert!(
+ parse_result.is_ok(),
+ "parse result must be ok, got: {:?}",
+ parse_result
+ );
+
+ Ok(())
+ }
+
#[test]
fn test_avro_3820_deny_invalid_field_names() -> TestResult {
let schema_str = r#"
diff --git a/lang/rust/avro/src/types.rs b/lang/rust/avro/src/types.rs
index 006a18e1b..c90e69933 100644
--- a/lang/rust/avro/src/types.rs
+++ b/lang/rust/avro/src/types.rs
@@ -17,7 +17,7 @@
//! Logic handling the intermediate representation of Avro values.
use crate::{
- decimal::Decimal,
+ decimal::{deserialize_big_decimal, serialize_big_decimal, Decimal},
duration::Duration,
schema::{
DecimalSchema, EnumSchema, FixedSchema, Name, Namespace, Precision,
RecordField,
@@ -25,6 +25,7 @@ use crate::{
},
AvroResult, Error,
};
+use bigdecimal::BigDecimal;
use serde_json::{Number, Value as JsonValue};
use std::{
borrow::Borrow,
@@ -100,6 +101,8 @@ pub enum Value {
Date(i32),
/// An Avro Decimal value. Bytes are in big-endian order, per the Avro
spec.
Decimal(Decimal),
+ /// An Avro Decimal value.
+ BigDecimal(BigDecimal),
/// Time in milliseconds.
TimeMillis(i32),
/// Time in microseconds.
@@ -154,6 +157,7 @@ to_value!(String, Value::String);
to_value!(Vec<u8>, Value::Bytes);
to_value!(uuid::Uuid, Value::Uuid);
to_value!(Decimal, Value::Decimal);
+to_value!(BigDecimal, Value::BigDecimal);
to_value!(Duration, Value::Duration);
impl From<()> for Value {
@@ -327,6 +331,10 @@ impl TryFrom<Value> for JsonValue {
Value::Date(d) => Ok(Self::Number(d.into())),
Value::Decimal(ref d) => <Vec<u8>>::try_from(d)
.map(|vec| Self::Array(vec.into_iter().map(|v|
v.into()).collect())),
+ Value::BigDecimal(ref bg) => {
+ let vec1: Vec<u8> = serialize_big_decimal(bg);
+ Ok(Self::Array(vec1.into_iter().map(|b| b.into()).collect()))
+ }
Value::TimeMillis(t) => Ok(Self::Number(t.into())),
Value::TimeMicros(t) => Ok(Self::Number(t.into())),
Value::TimestampMillis(t) => Ok(Self::Number(t.into())),
@@ -425,6 +433,7 @@ impl Value {
(&Value::TimeMillis(_), &Schema::TimeMillis) => None,
(&Value::Date(_), &Schema::Date) => None,
(&Value::Decimal(_), &Schema::Decimal { .. }) => None,
+ (&Value::BigDecimal(_), &Schema::BigDecimal) => None,
(&Value::Duration(_), &Schema::Duration) => None,
(&Value::Uuid(_), &Schema::Uuid) => None,
(&Value::Float(_), &Schema::Float) => None,
@@ -634,7 +643,6 @@ impl Value {
};
self = v;
}
-
match *schema {
Schema::Ref { ref name } => {
let name = name.fully_qualified_name(enclosing_namespace);
@@ -674,6 +682,7 @@ impl Value {
precision,
ref inner,
}) => self.resolve_decimal(precision, scale, inner),
+ Schema::BigDecimal => self.resolve_bigdecimal(),
Schema::Date => self.resolve_date(),
Schema::TimeMillis => self.resolve_time_millis(),
Schema::TimeMicros => self.resolve_time_micros(),
@@ -696,6 +705,14 @@ impl Value {
})
}
+ fn resolve_bigdecimal(self) -> Result<Self, Error> {
+ Ok(match self {
+ bg @ Value::BigDecimal(_) => bg,
+ Value::Bytes(b) =>
Value::BigDecimal(deserialize_big_decimal(&b).unwrap()),
+ other => return Err(Error::GetBigdecimal(other.into())),
+ })
+ }
+
fn resolve_duration(self) -> Result<Self, Error> {
Ok(match self {
duration @ Value::Duration { .. } => duration,
@@ -2968,7 +2985,21 @@ Field with name '"b"' is not a member of the map items"#,
attributes: Default::default()
}))
.is_err(),);
+ Ok(())
+ }
+ #[test]
+ fn test_avro_3779_bigdecimal_resolving() -> TestResult {
+ let schema =
+ r#"{"name": "bigDecimalSchema", "logicalType": "big-decimal",
"type": "bytes" }"#;
+
+ let avro_value = Value::BigDecimal(BigDecimal::from(12345678u32));
+ let schema = Schema::parse_str(schema)?;
+ let resolve_result: AvroResult<Value> = avro_value.resolve(&schema);
+ assert!(
+ resolve_result.is_ok(),
+ "resolve result must be ok, got: {resolve_result:?}"
+ );
Ok(())
}
}