Fokko commented on code in PR #20: URL: https://github.com/apache/iceberg-rust/pull/20#discussion_r1283265597
########## crates/iceberg/src/spec/values.rs: ########## @@ -0,0 +1,663 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/*! + * Value in iceberg + */ + +use std::{any::Any, collections::HashMap, fmt, ops::Deref}; + +use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc}; +use ordered_float::OrderedFloat; +use rust_decimal::Decimal; +use serde::{ + de::{MapAccess, Visitor}, + ser::SerializeStruct, + Deserialize, Deserializer, Serialize, +}; +use serde_bytes::ByteBuf; +use uuid::Uuid; + +use crate::Error; + +use super::datatypes::{PrimitiveType, Type}; + +/// Values present in iceberg type +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +#[serde(untagged)] +pub enum Value { Review Comment: In Python and Java, we refer to this as a literal. ########## crates/iceberg/src/spec/values.rs: ########## @@ -0,0 +1,663 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/*! + * Value in iceberg + */ + +use std::{any::Any, collections::HashMap, fmt, ops::Deref}; + +use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc}; +use ordered_float::OrderedFloat; +use rust_decimal::Decimal; +use serde::{ + de::{MapAccess, Visitor}, + ser::SerializeStruct, + Deserialize, Deserializer, Serialize, +}; +use serde_bytes::ByteBuf; +use uuid::Uuid; + +use crate::Error; + +use super::datatypes::{PrimitiveType, Type}; + +/// Values present in iceberg type +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +#[serde(untagged)] +pub enum Value { + /// 0x00 for false, non-zero byte for true + Boolean(bool), + /// Stored as 4-byte little-endian + Int(i32), + /// Stored as 8-byte little-endian + Long(i64), + /// Stored as 4-byte little-endian + Float(#[serde(with = "float")] OrderedFloat<f32>), + /// Stored as 8-byte little-endian + Double(#[serde(with = "double")] OrderedFloat<f64>), + /// Stores days from the 1970-01-01 in an 4-byte little-endian int + Date(#[serde(with = "date")] NaiveDate), + /// Stores microseconds from midnight in an 8-byte little-endian long + Time(#[serde(with = "time")] NaiveTime), + /// Timestamp without timezone + Timestamp(#[serde(with = "timestamp")] NaiveDateTime), + /// Timestamp with timezone + TimestampTZ(#[serde(with = "timestamptz")] DateTime<Utc>), + /// UTF-8 bytes (without length) + String(String), + /// 16-byte big-endian value + UUID(Uuid), + /// Binary value + Fixed(usize, Vec<u8>), + /// Binary value (without length) + Binary(Vec<u8>), + /// Stores unscaled value as two’s-complement big-endian binary, + /// using the minimum number of bytes for the value + Decimal(Decimal), + /// A struct is a tuple of typed values. Each field in the tuple is named and has an integer id that is unique in the table schema. + /// Each field can be either optional or required, meaning that values can (or cannot) be null. Fields may be any type. + /// Fields may have an optional comment or doc string. Fields can have default values. + Struct(Struct), + /// A list is a collection of values with some element type. + /// The element field has an integer id that is unique in the table schema. + /// Elements can be either optional or required. Element types may be any type. + List(Vec<Option<Value>>), + /// A map is a collection of key-value pairs with a key type and a value type. + /// Both the key field and value field each have an integer id that is unique in the table schema. + /// Map keys are required and map values can be either optional or required. Both map keys and map values may be any type, including nested types. + Map(HashMap<String, Option<Value>>), +} + +impl TryFrom<Value> for ByteBuf { + type Error = Error; + fn try_from(value: Value) -> Result<Self, Self::Error> { + match value { + Value::Boolean(val) => { + if val { + Ok(ByteBuf::from([0u8])) + } else { + Ok(ByteBuf::from([1u8])) + } + } + Value::Int(val) => Ok(ByteBuf::from(val.to_le_bytes())), + Value::Long(val) => Ok(ByteBuf::from(val.to_le_bytes())), + Value::Float(val) => Ok(ByteBuf::from(val.to_le_bytes())), + Value::Double(val) => Ok(ByteBuf::from(val.to_le_bytes())), + Value::Date(val) => Ok(ByteBuf::from(date::date_to_days(&val)?.to_le_bytes())), + Value::Time(val) => Ok(ByteBuf::from( + time::time_to_microseconds(&val)?.to_le_bytes(), + )), + Value::Timestamp(val) => Ok(ByteBuf::from( + timestamp::datetime_to_microseconds(&val)?.to_le_bytes(), + )), + Value::TimestampTZ(val) => Ok(ByteBuf::from( + timestamptz::datetimetz_to_microseconds(&val)?.to_le_bytes(), + )), + Value::String(val) => Ok(ByteBuf::from(val.as_bytes())), + Value::UUID(val) => Ok(ByteBuf::from(val.as_u128().to_be_bytes())), + Value::Fixed(_, val) => Ok(ByteBuf::from(val)), + Value::Binary(val) => Ok(ByteBuf::from(val)), + _ => todo!(), + } + } +} + +/// The partition struct stores the tuple of partition values for each file. +/// Its type is derived from the partition fields of the partition spec used to write the manifest file. +/// In v2, the partition struct’s field ids must match the ids from the partition spec. +#[derive(Debug, Clone, PartialEq)] +pub struct Struct { + /// Vector to store the field values + fields: Vec<Option<Value>>, + /// A lookup that matches the field name to the entry in the vector + lookup: HashMap<String, usize>, +} + +impl Deref for Struct { + type Target = [Option<Value>]; + + fn deref(&self) -> &Self::Target { + &self.fields + } +} + +impl Struct { + /// Get reference to partition value + pub fn get(&self, name: &str) -> Option<&Value> { + self.fields + .get(*self.lookup.get(name)?) + .and_then(|x| x.as_ref()) + } + /// Get mutable reference to partition value + pub fn get_mut(&mut self, name: &str) -> Option<&mut Value> { + self.fields + .get_mut(*self.lookup.get(name)?) + .and_then(|x| x.as_mut()) + } +} + +impl Serialize for Struct { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + let mut record = serializer.serialize_struct("r102", self.fields.len())?; + for (i, value) in self.fields.iter().enumerate() { + let (key, _) = self.lookup.iter().find(|(_, value)| **value == i).unwrap(); + record.serialize_field(Box::leak(key.clone().into_boxed_str()), value)?; + } + record.end() + } +} + +impl<'de> Deserialize<'de> for Struct { + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where + D: Deserializer<'de>, + { + struct PartitionStructVisitor; + + impl<'de> Visitor<'de> for PartitionStructVisitor { + type Value = Struct; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("map") + } + + fn visit_map<V>(self, mut map: V) -> Result<Struct, V::Error> + where + V: MapAccess<'de>, + { + let mut fields: Vec<Option<Value>> = Vec::new(); + let mut lookup: HashMap<String, usize> = HashMap::new(); + let mut index = 0; + while let Some(key) = map.next_key()? { + fields.push(map.next_value()?); + lookup.insert(key, index); + index += 1; + } + Ok(Struct { fields, lookup }) + } + } + deserializer.deserialize_struct( + "r102", + Box::leak(vec![].into_boxed_slice()), + PartitionStructVisitor, + ) + } +} + +impl FromIterator<(String, Option<Value>)> for Struct { + fn from_iter<I: IntoIterator<Item = (String, Option<Value>)>>(iter: I) -> Self { + let mut fields = Vec::new(); + let mut lookup = HashMap::new(); + + for (i, (key, value)) in iter.into_iter().enumerate() { + fields.push(value); + lookup.insert(key, i); + } + + Struct { fields, lookup } + } +} + +impl Value { + #[inline] + /// Create iceberg value from bytes + pub fn try_from_bytes(bytes: &[u8], data_type: &Type) -> Result<Self, Error> { + match data_type { + Type::Primitive(primitive) => match primitive { + PrimitiveType::Boolean => { + if bytes.len() == 1 && bytes[0] == 0u8 { + Ok(Value::Boolean(false)) + } else { + Ok(Value::Boolean(true)) + } + } + PrimitiveType::Int => Ok(Value::Int(i32::from_le_bytes(bytes.try_into()?))), + PrimitiveType::Long => Ok(Value::Long(i64::from_le_bytes(bytes.try_into()?))), + PrimitiveType::Float => Ok(Value::Float(OrderedFloat(f32::from_le_bytes( + bytes.try_into()?, + )))), + PrimitiveType::Double => Ok(Value::Double(OrderedFloat(f64::from_le_bytes( + bytes.try_into()?, + )))), + PrimitiveType::Date => Ok(Value::Date(date::days_to_date(i32::from_le_bytes( + bytes.try_into()?, + ))?)), + PrimitiveType::Time => Ok(Value::Time(time::microseconds_to_time( + i64::from_le_bytes(bytes.try_into()?), + )?)), + PrimitiveType::Timestamp => Ok(Value::Timestamp( + timestamp::microseconds_to_datetime(i64::from_le_bytes(bytes.try_into()?))?, + )), + PrimitiveType::Timestamptz => Ok(Value::TimestampTZ( + timestamptz::microseconds_to_datetimetz(i64::from_le_bytes(bytes.try_into()?))?, + )), + PrimitiveType::String => Ok(Value::String(std::str::from_utf8(bytes)?.to_string())), + PrimitiveType::Uuid => Ok(Value::UUID(Uuid::from_u128(u128::from_be_bytes( + bytes.try_into()?, + )))), + PrimitiveType::Fixed(len) => Ok(Value::Fixed(*len as usize, Vec::from(bytes))), + PrimitiveType::Binary => Ok(Value::Binary(Vec::from(bytes))), + _ => Err(Error::new( + crate::ErrorKind::DataInvalid, + "Converting bytes to decimal is not supported.", + )), + }, + _ => Err(Error::new( + crate::ErrorKind::DataInvalid, + "Converting bytes to non-primitive types is not supported.", + )), + } + } + + /// Get datatype of value + pub fn datatype(&self) -> Type { + match self { + Value::Boolean(_) => Type::Primitive(PrimitiveType::Boolean), + Value::Int(_) => Type::Primitive(PrimitiveType::Int), + Value::Long(_) => Type::Primitive(PrimitiveType::Long), + Value::Float(_) => Type::Primitive(PrimitiveType::Float), + Value::Double(_) => Type::Primitive(PrimitiveType::Double), + Value::Date(_) => Type::Primitive(PrimitiveType::Date), + Value::Time(_) => Type::Primitive(PrimitiveType::Time), + Value::Timestamp(_) => Type::Primitive(PrimitiveType::Timestamp), + Value::TimestampTZ(_) => Type::Primitive(PrimitiveType::Timestamptz), + Value::Fixed(len, _) => Type::Primitive(PrimitiveType::Fixed(*len as u64)), + Value::Binary(_) => Type::Primitive(PrimitiveType::Binary), + Value::String(_) => Type::Primitive(PrimitiveType::String), + Value::UUID(_) => Type::Primitive(PrimitiveType::Uuid), + Value::Decimal(dec) => Type::Primitive(PrimitiveType::Decimal { + precision: 38, Review Comment: I think we want to make the precision configurable as well. ########## crates/iceberg/src/spec/values.rs: ########## @@ -0,0 +1,417 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/*! + * Value in iceberg + */ + +use std::{any::Any, collections::HashMap, fmt, ops::Deref}; + +use rust_decimal::Decimal; +use serde::{ + de::{MapAccess, Visitor}, + ser::SerializeStruct, + Deserialize, Deserializer, Serialize, +}; +use serde_bytes::ByteBuf; + +use crate::Error; + +use super::datatypes::{PrimitiveType, Type}; + +/// Values present in iceberg type +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +#[serde(untagged)] +pub enum Value { + /// 0x00 for false, non-zero byte for true + Boolean(bool), + /// Stored as 4-byte little-endian + Int(i32), + /// Stored as 8-byte little-endian + LongInt(i64), + /// Stored as 4-byte little-endian + Float(f32), + /// Stored as 8-byte little-endian + Double(f64), + /// Stores days from the 1970-01-01 in an 4-byte little-endian int + Date(i32), + /// Stores microseconds from midnight in an 8-byte little-endian long + Time(i64), + /// Stores microseconds from 1970-01-01 00:00:00.000000 in an 8-byte little-endian long + Timestamp(i64), + /// Stores microseconds from 1970-01-01 00:00:00.000000 in an 8-byte little-endian long + TimestampTZ(i64), + /// UTF-8 bytes (without length) + String(String), + /// 16-byte big-endian value + UUID(i128), + /// Binary value + Fixed(usize, Vec<u8>), Review Comment: Agree as well :) ########## crates/iceberg/src/spec/values.rs: ########## @@ -0,0 +1,417 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/*! + * Value in iceberg + */ + +use std::{any::Any, collections::HashMap, fmt, ops::Deref}; + +use rust_decimal::Decimal; +use serde::{ + de::{MapAccess, Visitor}, + ser::SerializeStruct, + Deserialize, Deserializer, Serialize, +}; +use serde_bytes::ByteBuf; + +use crate::Error; + +use super::datatypes::{PrimitiveType, Type}; + +/// Values present in iceberg type +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +#[serde(untagged)] +pub enum Value { + /// 0x00 for false, non-zero byte for true + Boolean(bool), + /// Stored as 4-byte little-endian + Int(i32), + /// Stored as 8-byte little-endian + LongInt(i64), + /// Stored as 4-byte little-endian + Float(f32), + /// Stored as 8-byte little-endian + Double(f64), + /// Stores days from the 1970-01-01 in an 4-byte little-endian int + Date(i32), + /// Stores microseconds from midnight in an 8-byte little-endian long + Time(i64), + /// Stores microseconds from 1970-01-01 00:00:00.000000 in an 8-byte little-endian long + Timestamp(i64), + /// Stores microseconds from 1970-01-01 00:00:00.000000 in an 8-byte little-endian long + TimestampTZ(i64), + /// UTF-8 bytes (without length) + String(String), + /// 16-byte big-endian value + UUID(i128), + /// Binary value + Fixed(usize, Vec<u8>), + /// Binary value (without length) + Binary(Vec<u8>), + /// Stores unscaled value as two’s-complement big-endian binary, + /// using the minimum number of bytes for the value + Decimal(Decimal), + /// A struct is a tuple of typed values. Each field in the tuple is named and has an integer id that is unique in the table schema. + /// Each field can be either optional or required, meaning that values can (or cannot) be null. Fields may be any type. + /// Fields may have an optional comment or doc string. Fields can have default values. + Struct(Struct), + /// A list is a collection of values with some element type. + /// The element field has an integer id that is unique in the table schema. + /// Elements can be either optional or required. Element types may be any type. + List(Vec<Option<Value>>), + /// A map is a collection of key-value pairs with a key type and a value type. + /// Both the key field and value field each have an integer id that is unique in the table schema. + /// Map keys are required and map values can be either optional or required. Both map keys and map values may be any type, including nested types. + Map(HashMap<String, Option<Value>>), +} + +impl From<Value> for ByteBuf { + fn from(value: Value) -> Self { + match value { + Value::Boolean(val) => { + if val { + ByteBuf::from([0u8]) + } else { + ByteBuf::from([1u8]) + } + } + Value::Int(val) => ByteBuf::from(val.to_le_bytes()), + Value::LongInt(val) => ByteBuf::from(val.to_le_bytes()), + Value::Float(val) => ByteBuf::from(val.to_le_bytes()), + Value::Double(val) => ByteBuf::from(val.to_le_bytes()), + Value::Date(val) => ByteBuf::from(val.to_le_bytes()), + Value::Time(val) => ByteBuf::from(val.to_le_bytes()), + Value::Timestamp(val) => ByteBuf::from(val.to_le_bytes()), + Value::TimestampTZ(val) => ByteBuf::from(val.to_le_bytes()), + Value::String(val) => ByteBuf::from(val.as_bytes()), + Value::UUID(val) => ByteBuf::from(val.to_be_bytes()), + Value::Fixed(_, val) => ByteBuf::from(val), + Value::Binary(val) => ByteBuf::from(val), + _ => todo!(), + } + } +} + +/// The partition struct stores the tuple of partition values for each file. +/// Its type is derived from the partition fields of the partition spec used to write the manifest file. +/// In v2, the partition struct’s field ids must match the ids from the partition spec. +#[derive(Debug, Clone, PartialEq)] +pub struct Struct { + /// Vector to store the field values + fields: Vec<Option<Value>>, + /// A lookup that matches the field name to the entry in the vector + lookup: HashMap<String, usize>, Review Comment: I see, thanks for the context. So in Python we deserialize the `102: partition struct<...>` in a `Record`, that implement `StructProtocol`. Since we have the schema from the Avro file, we know what values are at which positions, and we can lookup at the index (which is super fast! :) So let's say that you have a `day(dt) as partition_by_day` partitioning. Where `dt` is a `Timestamp` in your schema, and `partition_by_day` is the hidden partition that buckets the writes per file. Then the partition struct would look like `struct<partition_by_day: int>`. If the user does an expression `dt >= '1990-03-01 19:25:00'`, which is read as a string value, but gets converted to a date value when binding the expression to the schema. Similar to binding a normal expression to a Schema, we also bind the hidden partition filter to the `partition` schema. Since we know the position of the field in the schema, we can just evaluate using an accessor at index 0 in this case. We take the value and using the evaluator, we check if it might match, if that's the case, we include the file in the scan. Code-wise, this is all done in the [ManifestEvaluator](https://github.com/apache/iceberg/blob/master/python/pyiceberg/expressions/visitors.py#L783-L789) ########## crates/iceberg/src/spec/values.rs: ########## @@ -0,0 +1,417 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/*! + * Value in iceberg + */ + +use std::{any::Any, collections::HashMap, fmt, ops::Deref}; + +use rust_decimal::Decimal; +use serde::{ + de::{MapAccess, Visitor}, + ser::SerializeStruct, + Deserialize, Deserializer, Serialize, +}; +use serde_bytes::ByteBuf; + +use crate::Error; + +use super::datatypes::{PrimitiveType, Type}; + +/// Values present in iceberg type +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +#[serde(untagged)] +pub enum Value { + /// 0x00 for false, non-zero byte for true + Boolean(bool), + /// Stored as 4-byte little-endian + Int(i32), + /// Stored as 8-byte little-endian + LongInt(i64), + /// Stored as 4-byte little-endian + Float(f32), + /// Stored as 8-byte little-endian + Double(f64), + /// Stores days from the 1970-01-01 in an 4-byte little-endian int + Date(i32), + /// Stores microseconds from midnight in an 8-byte little-endian long + Time(i64), + /// Stores microseconds from 1970-01-01 00:00:00.000000 in an 8-byte little-endian long + Timestamp(i64), + /// Stores microseconds from 1970-01-01 00:00:00.000000 in an 8-byte little-endian long + TimestampTZ(i64), + /// UTF-8 bytes (without length) + String(String), + /// 16-byte big-endian value + UUID(i128), + /// Binary value + Fixed(usize, Vec<u8>), + /// Binary value (without length) + Binary(Vec<u8>), + /// Stores unscaled value as two’s-complement big-endian binary, + /// using the minimum number of bytes for the value + Decimal(Decimal), + /// A struct is a tuple of typed values. Each field in the tuple is named and has an integer id that is unique in the table schema. + /// Each field can be either optional or required, meaning that values can (or cannot) be null. Fields may be any type. + /// Fields may have an optional comment or doc string. Fields can have default values. + Struct(Struct), + /// A list is a collection of values with some element type. + /// The element field has an integer id that is unique in the table schema. + /// Elements can be either optional or required. Element types may be any type. + List(Vec<Option<Value>>), + /// A map is a collection of key-value pairs with a key type and a value type. + /// Both the key field and value field each have an integer id that is unique in the table schema. + /// Map keys are required and map values can be either optional or required. Both map keys and map values may be any type, including nested types. + Map(HashMap<String, Option<Value>>), +} + +impl From<Value> for ByteBuf { + fn from(value: Value) -> Self { + match value { + Value::Boolean(val) => { + if val { + ByteBuf::from([0u8]) + } else { + ByteBuf::from([1u8]) + } + } + Value::Int(val) => ByteBuf::from(val.to_le_bytes()), + Value::LongInt(val) => ByteBuf::from(val.to_le_bytes()), + Value::Float(val) => ByteBuf::from(val.to_le_bytes()), + Value::Double(val) => ByteBuf::from(val.to_le_bytes()), + Value::Date(val) => ByteBuf::from(val.to_le_bytes()), + Value::Time(val) => ByteBuf::from(val.to_le_bytes()), + Value::Timestamp(val) => ByteBuf::from(val.to_le_bytes()), + Value::TimestampTZ(val) => ByteBuf::from(val.to_le_bytes()), + Value::String(val) => ByteBuf::from(val.as_bytes()), + Value::UUID(val) => ByteBuf::from(val.to_be_bytes()), + Value::Fixed(_, val) => ByteBuf::from(val), + Value::Binary(val) => ByteBuf::from(val), + _ => todo!(), + } + } +} + +/// The partition struct stores the tuple of partition values for each file. +/// Its type is derived from the partition fields of the partition spec used to write the manifest file. +/// In v2, the partition struct’s field ids must match the ids from the partition spec. +#[derive(Debug, Clone, PartialEq)] +pub struct Struct { + /// Vector to store the field values + fields: Vec<Option<Value>>, + /// A lookup that matches the field name to the entry in the vector + lookup: HashMap<String, usize>, +} + +impl Deref for Struct { + type Target = [Option<Value>]; + + fn deref(&self) -> &Self::Target { + &self.fields + } +} + +impl Struct { + /// Get reference to partition value + pub fn get(&self, name: &str) -> Option<&Value> { Review Comment: I agree, see my comment just above. ########## crates/iceberg/src/spec/values.rs: ########## @@ -0,0 +1,663 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/*! + * Value in iceberg + */ + +use std::{any::Any, collections::HashMap, fmt, ops::Deref}; + +use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc}; +use ordered_float::OrderedFloat; +use rust_decimal::Decimal; +use serde::{ + de::{MapAccess, Visitor}, + ser::SerializeStruct, + Deserialize, Deserializer, Serialize, +}; +use serde_bytes::ByteBuf; +use uuid::Uuid; + +use crate::Error; + +use super::datatypes::{PrimitiveType, Type}; + +/// Values present in iceberg type +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +#[serde(untagged)] +pub enum Value { + /// 0x00 for false, non-zero byte for true + Boolean(bool), + /// Stored as 4-byte little-endian + Int(i32), + /// Stored as 8-byte little-endian + Long(i64), + /// Stored as 4-byte little-endian + Float(#[serde(with = "float")] OrderedFloat<f32>), + /// Stored as 8-byte little-endian + Double(#[serde(with = "double")] OrderedFloat<f64>), + /// Stores days from the 1970-01-01 in an 4-byte little-endian int + Date(#[serde(with = "date")] NaiveDate), + /// Stores microseconds from midnight in an 8-byte little-endian long + Time(#[serde(with = "time")] NaiveTime), + /// Timestamp without timezone + Timestamp(#[serde(with = "timestamp")] NaiveDateTime), + /// Timestamp with timezone + TimestampTZ(#[serde(with = "timestamptz")] DateTime<Utc>), + /// UTF-8 bytes (without length) + String(String), + /// 16-byte big-endian value + UUID(Uuid), + /// Binary value + Fixed(usize, Vec<u8>), + /// Binary value (without length) + Binary(Vec<u8>), + /// Stores unscaled value as two’s-complement big-endian binary, + /// using the minimum number of bytes for the value + Decimal(Decimal), + /// A struct is a tuple of typed values. Each field in the tuple is named and has an integer id that is unique in the table schema. + /// Each field can be either optional or required, meaning that values can (or cannot) be null. Fields may be any type. + /// Fields may have an optional comment or doc string. Fields can have default values. + Struct(Struct), + /// A list is a collection of values with some element type. + /// The element field has an integer id that is unique in the table schema. + /// Elements can be either optional or required. Element types may be any type. + List(Vec<Option<Value>>), + /// A map is a collection of key-value pairs with a key type and a value type. + /// Both the key field and value field each have an integer id that is unique in the table schema. + /// Map keys are required and map values can be either optional or required. Both map keys and map values may be any type, including nested types. + Map(HashMap<String, Option<Value>>), +} + +impl TryFrom<Value> for ByteBuf { + type Error = Error; + fn try_from(value: Value) -> Result<Self, Self::Error> { + match value { + Value::Boolean(val) => { + if val { + Ok(ByteBuf::from([0u8])) + } else { + Ok(ByteBuf::from([1u8])) + } + } + Value::Int(val) => Ok(ByteBuf::from(val.to_le_bytes())), + Value::Long(val) => Ok(ByteBuf::from(val.to_le_bytes())), + Value::Float(val) => Ok(ByteBuf::from(val.to_le_bytes())), + Value::Double(val) => Ok(ByteBuf::from(val.to_le_bytes())), + Value::Date(val) => Ok(ByteBuf::from(date::date_to_days(&val)?.to_le_bytes())), + Value::Time(val) => Ok(ByteBuf::from( + time::time_to_microseconds(&val)?.to_le_bytes(), + )), + Value::Timestamp(val) => Ok(ByteBuf::from( + timestamp::datetime_to_microseconds(&val)?.to_le_bytes(), + )), + Value::TimestampTZ(val) => Ok(ByteBuf::from( + timestamptz::datetimetz_to_microseconds(&val)?.to_le_bytes(), + )), + Value::String(val) => Ok(ByteBuf::from(val.as_bytes())), + Value::UUID(val) => Ok(ByteBuf::from(val.as_u128().to_be_bytes())), + Value::Fixed(_, val) => Ok(ByteBuf::from(val)), + Value::Binary(val) => Ok(ByteBuf::from(val)), + _ => todo!(), + } + } +} + +/// The partition struct stores the tuple of partition values for each file. +/// Its type is derived from the partition fields of the partition spec used to write the manifest file. +/// In v2, the partition struct’s field ids must match the ids from the partition spec. +#[derive(Debug, Clone, PartialEq)] +pub struct Struct { + /// Vector to store the field values + fields: Vec<Option<Value>>, + /// A lookup that matches the field name to the entry in the vector + lookup: HashMap<String, usize>, +} + +impl Deref for Struct { + type Target = [Option<Value>]; + + fn deref(&self) -> &Self::Target { + &self.fields + } +} + +impl Struct { + /// Get reference to partition value + pub fn get(&self, name: &str) -> Option<&Value> { + self.fields + .get(*self.lookup.get(name)?) + .and_then(|x| x.as_ref()) + } + /// Get mutable reference to partition value + pub fn get_mut(&mut self, name: &str) -> Option<&mut Value> { + self.fields + .get_mut(*self.lookup.get(name)?) + .and_then(|x| x.as_mut()) + } +} + +impl Serialize for Struct { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + let mut record = serializer.serialize_struct("r102", self.fields.len())?; + for (i, value) in self.fields.iter().enumerate() { + let (key, _) = self.lookup.iter().find(|(_, value)| **value == i).unwrap(); + record.serialize_field(Box::leak(key.clone().into_boxed_str()), value)?; + } + record.end() + } +} + +impl<'de> Deserialize<'de> for Struct { + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where + D: Deserializer<'de>, + { + struct PartitionStructVisitor; + + impl<'de> Visitor<'de> for PartitionStructVisitor { + type Value = Struct; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("map") + } + + fn visit_map<V>(self, mut map: V) -> Result<Struct, V::Error> + where + V: MapAccess<'de>, + { + let mut fields: Vec<Option<Value>> = Vec::new(); + let mut lookup: HashMap<String, usize> = HashMap::new(); + let mut index = 0; + while let Some(key) = map.next_key()? { + fields.push(map.next_value()?); + lookup.insert(key, index); + index += 1; + } + Ok(Struct { fields, lookup }) + } + } + deserializer.deserialize_struct( + "r102", + Box::leak(vec![].into_boxed_slice()), + PartitionStructVisitor, + ) + } +} + +impl FromIterator<(String, Option<Value>)> for Struct { + fn from_iter<I: IntoIterator<Item = (String, Option<Value>)>>(iter: I) -> Self { + let mut fields = Vec::new(); + let mut lookup = HashMap::new(); + + for (i, (key, value)) in iter.into_iter().enumerate() { + fields.push(value); + lookup.insert(key, i); + } + + Struct { fields, lookup } + } +} + +impl Value { + #[inline] + /// Create iceberg value from bytes + pub fn try_from_bytes(bytes: &[u8], data_type: &Type) -> Result<Self, Error> { + match data_type { + Type::Primitive(primitive) => match primitive { + PrimitiveType::Boolean => { + if bytes.len() == 1 && bytes[0] == 0u8 { + Ok(Value::Boolean(false)) + } else { + Ok(Value::Boolean(true)) + } + } + PrimitiveType::Int => Ok(Value::Int(i32::from_le_bytes(bytes.try_into()?))), + PrimitiveType::Long => Ok(Value::Long(i64::from_le_bytes(bytes.try_into()?))), + PrimitiveType::Float => Ok(Value::Float(OrderedFloat(f32::from_le_bytes( + bytes.try_into()?, + )))), + PrimitiveType::Double => Ok(Value::Double(OrderedFloat(f64::from_le_bytes( + bytes.try_into()?, + )))), + PrimitiveType::Date => Ok(Value::Date(date::days_to_date(i32::from_le_bytes( + bytes.try_into()?, + ))?)), + PrimitiveType::Time => Ok(Value::Time(time::microseconds_to_time( + i64::from_le_bytes(bytes.try_into()?), + )?)), + PrimitiveType::Timestamp => Ok(Value::Timestamp( + timestamp::microseconds_to_datetime(i64::from_le_bytes(bytes.try_into()?))?, + )), + PrimitiveType::Timestamptz => Ok(Value::TimestampTZ( + timestamptz::microseconds_to_datetimetz(i64::from_le_bytes(bytes.try_into()?))?, + )), + PrimitiveType::String => Ok(Value::String(std::str::from_utf8(bytes)?.to_string())), + PrimitiveType::Uuid => Ok(Value::UUID(Uuid::from_u128(u128::from_be_bytes( + bytes.try_into()?, + )))), + PrimitiveType::Fixed(len) => Ok(Value::Fixed(*len as usize, Vec::from(bytes))), + PrimitiveType::Binary => Ok(Value::Binary(Vec::from(bytes))), + _ => Err(Error::new( + crate::ErrorKind::DataInvalid, + "Converting bytes to decimal is not supported.", + )), + }, + _ => Err(Error::new( + crate::ErrorKind::DataInvalid, + "Converting bytes to non-primitive types is not supported.", + )), + } + } + + /// Get datatype of value + pub fn datatype(&self) -> Type { + match self { + Value::Boolean(_) => Type::Primitive(PrimitiveType::Boolean), + Value::Int(_) => Type::Primitive(PrimitiveType::Int), + Value::Long(_) => Type::Primitive(PrimitiveType::Long), + Value::Float(_) => Type::Primitive(PrimitiveType::Float), + Value::Double(_) => Type::Primitive(PrimitiveType::Double), + Value::Date(_) => Type::Primitive(PrimitiveType::Date), + Value::Time(_) => Type::Primitive(PrimitiveType::Time), + Value::Timestamp(_) => Type::Primitive(PrimitiveType::Timestamp), + Value::TimestampTZ(_) => Type::Primitive(PrimitiveType::Timestamptz), + Value::Fixed(len, _) => Type::Primitive(PrimitiveType::Fixed(*len as u64)), + Value::Binary(_) => Type::Primitive(PrimitiveType::Binary), + Value::String(_) => Type::Primitive(PrimitiveType::String), + Value::UUID(_) => Type::Primitive(PrimitiveType::Uuid), + Value::Decimal(dec) => Type::Primitive(PrimitiveType::Decimal { + precision: 38, + scale: dec.scale(), + }), + _ => unimplemented!(), + } + } + + /// Convert Value to the any type + pub fn into_any(self) -> Box<dyn Any> { + match self { + Value::Boolean(any) => Box::new(any), + Value::Int(any) => Box::new(any), + Value::Long(any) => Box::new(any), + Value::Float(any) => Box::new(any), + Value::Double(any) => Box::new(any), + Value::Date(any) => Box::new(any), + Value::Time(any) => Box::new(any), + Value::Timestamp(any) => Box::new(any), + Value::TimestampTZ(any) => Box::new(any), + Value::Fixed(_, any) => Box::new(any), + Value::Binary(any) => Box::new(any), + Value::String(any) => Box::new(any), + Value::UUID(any) => Box::new(any), + Value::Decimal(any) => Box::new(any), + _ => unimplemented!(), + } + } +} + +mod float { + use ordered_float::OrderedFloat; + use serde::{Deserialize, Deserializer, Serialize, Serializer}; + + pub fn serialize<S>(value: &OrderedFloat<f32>, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + value.serialize(serializer) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result<OrderedFloat<f32>, D::Error> + where + D: Deserializer<'de>, + { + f32::deserialize(deserializer).map(OrderedFloat) + } +} + +mod double { + use ordered_float::OrderedFloat; + use serde::{Deserialize, Deserializer, Serialize, Serializer}; + + pub fn serialize<S>(value: &OrderedFloat<f64>, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + value.serialize(serializer) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result<OrderedFloat<f64>, D::Error> + where + D: Deserializer<'de>, + { + f64::deserialize(deserializer).map(OrderedFloat) + } +} + +mod date { + use chrono::NaiveDate; + use serde::{de, ser, Deserialize, Deserializer, Serialize, Serializer}; + + use crate::Error; + + pub fn serialize<S>(value: &NaiveDate, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + let days = date_to_days(value).map_err(|err| ser::Error::custom(err.to_string()))?; + days.serialize(serializer) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result<NaiveDate, D::Error> + where + D: Deserializer<'de>, + { + let days = i32::deserialize(deserializer)?; + + days_to_date(days).map_err(|err| de::Error::custom(err.to_string())) + } + + pub(crate) fn date_to_days(date: &NaiveDate) -> Result<i32, Error> { + Ok(date + .signed_duration_since(NaiveDate::from_ymd_opt(1970, 0, 0).ok_or(Error::new( + crate::ErrorKind::DataInvalid, + "Failed to get time from midnight", + ))?) + .num_days() as i32) + } + + pub(crate) fn days_to_date(days: i32) -> Result<NaiveDate, Error> { + NaiveDate::from_num_days_from_ce_opt(days).ok_or(Error::new( + crate::ErrorKind::DataInvalid, + "Failed to convert microseconds to time", + )) + } +} + +mod time { + use chrono::NaiveTime; + use serde::{de, ser, Deserialize, Deserializer, Serialize, Serializer}; + + use crate::Error; + + pub fn serialize<S>(value: &NaiveTime, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + let micros = + time_to_microseconds(value).map_err(|err| ser::Error::custom(err.to_string()))?; + micros.serialize(serializer) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result<NaiveTime, D::Error> + where + D: Deserializer<'de>, + { + let micros = i64::deserialize(deserializer)?; + + microseconds_to_time(micros).map_err(|err| de::Error::custom(err.to_string())) + } + + pub(crate) fn time_to_microseconds(time: &NaiveTime) -> Result<i64, Error> { + time.signed_duration_since(NaiveTime::from_num_seconds_from_midnight_opt(0, 0).ok_or( + Error::new( + crate::ErrorKind::DataInvalid, + "Failed to get time from midnight", + ), + )?) + .num_microseconds() + .ok_or(Error::new( + crate::ErrorKind::DataInvalid, + "Failed to convert time to microseconds", + )) + } + + pub(crate) fn microseconds_to_time(micros: i64) -> Result<NaiveTime, Error> { + let (secs, rem) = (micros / 1_000_000, micros % 1_000_000); + + NaiveTime::from_num_seconds_from_midnight_opt(secs as u32, rem as u32 * 1000).ok_or( + Error::new( + crate::ErrorKind::DataInvalid, + "Failed to convert microseconds to time", + ), + ) + } +} + +mod timestamp { + use chrono::NaiveDateTime; + use serde::{de, ser, Deserialize, Deserializer, Serialize, Serializer}; + + use crate::Error; + + pub fn serialize<S>(value: &NaiveDateTime, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + let micros = + datetime_to_microseconds(value).map_err(|err| ser::Error::custom(err.to_string()))?; + micros.serialize(serializer) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result<NaiveDateTime, D::Error> + where + D: Deserializer<'de>, + { + let micros = i64::deserialize(deserializer)?; + + microseconds_to_datetime(micros).map_err(|err| de::Error::custom(err.to_string())) + } + + pub(crate) fn datetime_to_microseconds(time: &NaiveDateTime) -> Result<i64, Error> { + time.signed_duration_since(NaiveDateTime::from_timestamp_opt(0, 0).ok_or(Error::new( + crate::ErrorKind::DataInvalid, + "Failed to get time from midnight", + ))?) + .num_microseconds() + .ok_or(Error::new( + crate::ErrorKind::DataInvalid, + "Failed to convert time to microseconds", + )) + } + + pub(crate) fn microseconds_to_datetime(micros: i64) -> Result<NaiveDateTime, Error> { + let (secs, rem) = (micros / 1_000_000, micros % 1_000_000); + + NaiveDateTime::from_timestamp_opt(secs, rem as u32 * 1000).ok_or(Error::new( + crate::ErrorKind::DataInvalid, + "Failed to convert microseconds to time", + )) + } +} + +mod timestamptz { + use chrono::{DateTime, NaiveDateTime, Utc}; + use serde::{de, ser, Deserialize, Deserializer, Serialize, Serializer}; + + use crate::Error; + + pub fn serialize<S>(value: &DateTime<Utc>, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + let micros = + datetimetz_to_microseconds(value).map_err(|err| ser::Error::custom(err.to_string()))?; + micros.serialize(serializer) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result<DateTime<Utc>, D::Error> + where + D: Deserializer<'de>, + { + let micros = i64::deserialize(deserializer)?; + + microseconds_to_datetimetz(micros).map_err(|err| de::Error::custom(err.to_string())) + } + + pub(crate) fn datetimetz_to_microseconds(time: &DateTime<Utc>) -> Result<i64, Error> { + time.signed_duration_since(DateTime::<Utc>::from_utc( + NaiveDateTime::from_timestamp_opt(0, 0).ok_or(Error::new( + crate::ErrorKind::DataInvalid, + "Failed to get time from midnight", + ))?, + Utc, + )) + .num_microseconds() + .ok_or(Error::new( + crate::ErrorKind::DataInvalid, + "Failed to convert time to microseconds", + )) + } + + pub(crate) fn microseconds_to_datetimetz(micros: i64) -> Result<DateTime<Utc>, Error> { + let (secs, rem) = (micros / 1_000_000, micros % 1_000_000); + + Ok(DateTime::<Utc>::from_utc( + NaiveDateTime::from_timestamp_opt(secs, rem as u32 * 1000).ok_or(Error::new( + crate::ErrorKind::DataInvalid, + "Failed to convert microseconds to time", + ))?, + Utc, + )) + } +} + +#[cfg(test)] +mod tests { + + use super::*; + + #[test] + pub fn boolean() { + let input = Value::Boolean(true); + + let raw_schema = r#""boolean""#; + + let schema = apache_avro::Schema::parse_str(raw_schema).unwrap(); + + let mut writer = apache_avro::Writer::new(&schema, Vec::new()); + + writer.append_ser(input.clone()).unwrap(); + + let encoded = writer.into_inner().unwrap(); + + let reader = apache_avro::Reader::new(&*encoded).unwrap(); + + for record in reader { + let result = apache_avro::from_value::<Value>(&record.unwrap()).unwrap(); + assert_eq!(input, result); + } + } + + #[test] + pub fn int() { + let input = Value::Int(42); + + let raw_schema = r#""int""#; + + let schema = apache_avro::Schema::parse_str(raw_schema).unwrap(); + + let mut writer = apache_avro::Writer::new(&schema, Vec::new()); + + writer.append_ser(input.clone()).unwrap(); + + let encoded = writer.into_inner().unwrap(); + + let reader = apache_avro::Reader::new(&*encoded).unwrap(); + + for record in reader { + let result = apache_avro::from_value::<Value>(&record.unwrap()).unwrap(); + assert_eq!(input, result); + } + } + + #[test] + pub fn float() { + let input = Value::Float(OrderedFloat(42.0)); + + let raw_schema = r#""float""#; + + let schema = apache_avro::Schema::parse_str(raw_schema).unwrap(); + + let mut writer = apache_avro::Writer::new(&schema, Vec::new()); + + writer.append_ser(input.clone()).unwrap(); + + let encoded = writer.into_inner().unwrap(); + + let reader = apache_avro::Reader::new(&*encoded).unwrap(); + + for record in reader { + let result = apache_avro::from_value::<Value>(&record.unwrap()).unwrap(); + assert_eq!(input, result); + } + } + + #[test] + pub fn string() { + let input = Value::String("test".to_string()); + + let raw_schema = r#""string""#; + + let schema = apache_avro::Schema::parse_str(raw_schema).unwrap(); + + let mut writer = apache_avro::Writer::new(&schema, Vec::new()); + + writer.append_ser(input.clone()).unwrap(); + + let encoded = writer.into_inner().unwrap(); + + let reader = apache_avro::Reader::new(&*encoded).unwrap(); + + for record in reader { + let result = apache_avro::from_value::<Value>(&record.unwrap()).unwrap(); + assert_eq!(input, result); + } + } + + #[test] + pub fn struct_value() { + let input = Value::Struct(Struct::from_iter(vec![( + "name".to_string(), + Some(Value::String("Alice".to_string())), + )])); + + let raw_schema = r#"{"type": "record","name": "r102","fields": [{ Review Comment: I'm a bit confused here, are we serializing Avro or Iceberg schema here? This is an Avro schema. The Iceberg equivalent would be: ```python Python 3.9.6 (default, May 7 2023, 23:32:44) [Clang 14.0.3 (clang-1403.0.22.14.1)] on darwin Type "help", "copyright", "credits" or "license" for more information. >>> from pyiceberg.types import * >>> StructType(NestedField(1, "name", StringType())).json() '{"type": "struct", "fields": [{"id": 1, "name": "name", "type": "string", "required": true}]}' ``` When I extract the `partition` field `avro-tools`: ``` { "partition": { "tpep_pickup_datetime_day": { "int": 19088 } } } ``` So in Python, I went all the way. What I did there is implement my own Avro reader that takes the Iceberg schema as input. I'm not saying that you should that do here, but it would maybe be helpful to implement a visitor to convert an Iceberg schema to an Avro schema. I would say that the `partition` would read into a `Vec<Any>` ########## crates/iceberg/src/spec/values.rs: ########## @@ -0,0 +1,663 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/*! + * Value in iceberg + */ + +use std::{any::Any, collections::HashMap, fmt, ops::Deref}; + +use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc}; +use ordered_float::OrderedFloat; +use rust_decimal::Decimal; +use serde::{ + de::{MapAccess, Visitor}, + ser::SerializeStruct, + Deserialize, Deserializer, Serialize, +}; +use serde_bytes::ByteBuf; +use uuid::Uuid; + +use crate::Error; + +use super::datatypes::{PrimitiveType, Type}; + +/// Values present in iceberg type +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +#[serde(untagged)] +pub enum Value { + /// 0x00 for false, non-zero byte for true + Boolean(bool), + /// Stored as 4-byte little-endian + Int(i32), + /// Stored as 8-byte little-endian + Long(i64), + /// Stored as 4-byte little-endian + Float(#[serde(with = "float")] OrderedFloat<f32>), + /// Stored as 8-byte little-endian + Double(#[serde(with = "double")] OrderedFloat<f64>), + /// Stores days from the 1970-01-01 in an 4-byte little-endian int + Date(#[serde(with = "date")] NaiveDate), + /// Stores microseconds from midnight in an 8-byte little-endian long + Time(#[serde(with = "time")] NaiveTime), + /// Timestamp without timezone + Timestamp(#[serde(with = "timestamp")] NaiveDateTime), + /// Timestamp with timezone + TimestampTZ(#[serde(with = "timestamptz")] DateTime<Utc>), + /// UTF-8 bytes (without length) + String(String), + /// 16-byte big-endian value + UUID(Uuid), + /// Binary value + Fixed(usize, Vec<u8>), + /// Binary value (without length) + Binary(Vec<u8>), + /// Stores unscaled value as two’s-complement big-endian binary, + /// using the minimum number of bytes for the value + Decimal(Decimal), + /// A struct is a tuple of typed values. Each field in the tuple is named and has an integer id that is unique in the table schema. + /// Each field can be either optional or required, meaning that values can (or cannot) be null. Fields may be any type. + /// Fields may have an optional comment or doc string. Fields can have default values. + Struct(Struct), + /// A list is a collection of values with some element type. + /// The element field has an integer id that is unique in the table schema. + /// Elements can be either optional or required. Element types may be any type. + List(Vec<Option<Value>>), + /// A map is a collection of key-value pairs with a key type and a value type. + /// Both the key field and value field each have an integer id that is unique in the table schema. + /// Map keys are required and map values can be either optional or required. Both map keys and map values may be any type, including nested types. + Map(HashMap<String, Option<Value>>), +} + +impl TryFrom<Value> for ByteBuf { + type Error = Error; + fn try_from(value: Value) -> Result<Self, Self::Error> { + match value { + Value::Boolean(val) => { + if val { + Ok(ByteBuf::from([0u8])) + } else { + Ok(ByteBuf::from([1u8])) + } + } + Value::Int(val) => Ok(ByteBuf::from(val.to_le_bytes())), + Value::Long(val) => Ok(ByteBuf::from(val.to_le_bytes())), + Value::Float(val) => Ok(ByteBuf::from(val.to_le_bytes())), + Value::Double(val) => Ok(ByteBuf::from(val.to_le_bytes())), + Value::Date(val) => Ok(ByteBuf::from(date::date_to_days(&val)?.to_le_bytes())), + Value::Time(val) => Ok(ByteBuf::from( + time::time_to_microseconds(&val)?.to_le_bytes(), + )), + Value::Timestamp(val) => Ok(ByteBuf::from( + timestamp::datetime_to_microseconds(&val)?.to_le_bytes(), + )), + Value::TimestampTZ(val) => Ok(ByteBuf::from( + timestamptz::datetimetz_to_microseconds(&val)?.to_le_bytes(), + )), + Value::String(val) => Ok(ByteBuf::from(val.as_bytes())), + Value::UUID(val) => Ok(ByteBuf::from(val.as_u128().to_be_bytes())), + Value::Fixed(_, val) => Ok(ByteBuf::from(val)), + Value::Binary(val) => Ok(ByteBuf::from(val)), + _ => todo!(), + } + } +} + +/// The partition struct stores the tuple of partition values for each file. +/// Its type is derived from the partition fields of the partition spec used to write the manifest file. +/// In v2, the partition struct’s field ids must match the ids from the partition spec. +#[derive(Debug, Clone, PartialEq)] +pub struct Struct { + /// Vector to store the field values + fields: Vec<Option<Value>>, + /// A lookup that matches the field name to the entry in the vector + lookup: HashMap<String, usize>, +} + +impl Deref for Struct { + type Target = [Option<Value>]; + + fn deref(&self) -> &Self::Target { + &self.fields + } +} + +impl Struct { + /// Get reference to partition value + pub fn get(&self, name: &str) -> Option<&Value> { + self.fields + .get(*self.lookup.get(name)?) + .and_then(|x| x.as_ref()) + } + /// Get mutable reference to partition value + pub fn get_mut(&mut self, name: &str) -> Option<&mut Value> { + self.fields + .get_mut(*self.lookup.get(name)?) + .and_then(|x| x.as_mut()) + } +} + +impl Serialize for Struct { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + let mut record = serializer.serialize_struct("r102", self.fields.len())?; + for (i, value) in self.fields.iter().enumerate() { + let (key, _) = self.lookup.iter().find(|(_, value)| **value == i).unwrap(); + record.serialize_field(Box::leak(key.clone().into_boxed_str()), value)?; + } + record.end() + } +} + +impl<'de> Deserialize<'de> for Struct { + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where + D: Deserializer<'de>, + { + struct PartitionStructVisitor; + + impl<'de> Visitor<'de> for PartitionStructVisitor { + type Value = Struct; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("map") + } + + fn visit_map<V>(self, mut map: V) -> Result<Struct, V::Error> + where + V: MapAccess<'de>, + { + let mut fields: Vec<Option<Value>> = Vec::new(); + let mut lookup: HashMap<String, usize> = HashMap::new(); + let mut index = 0; + while let Some(key) = map.next_key()? { + fields.push(map.next_value()?); + lookup.insert(key, index); + index += 1; + } + Ok(Struct { fields, lookup }) + } + } + deserializer.deserialize_struct( + "r102", + Box::leak(vec![].into_boxed_slice()), + PartitionStructVisitor, + ) + } +} + +impl FromIterator<(String, Option<Value>)> for Struct { + fn from_iter<I: IntoIterator<Item = (String, Option<Value>)>>(iter: I) -> Self { + let mut fields = Vec::new(); + let mut lookup = HashMap::new(); + + for (i, (key, value)) in iter.into_iter().enumerate() { + fields.push(value); + lookup.insert(key, i); + } + + Struct { fields, lookup } + } +} + +impl Value { + #[inline] + /// Create iceberg value from bytes + pub fn try_from_bytes(bytes: &[u8], data_type: &Type) -> Result<Self, Error> { + match data_type { + Type::Primitive(primitive) => match primitive { + PrimitiveType::Boolean => { + if bytes.len() == 1 && bytes[0] == 0u8 { + Ok(Value::Boolean(false)) + } else { + Ok(Value::Boolean(true)) + } + } + PrimitiveType::Int => Ok(Value::Int(i32::from_le_bytes(bytes.try_into()?))), + PrimitiveType::Long => Ok(Value::Long(i64::from_le_bytes(bytes.try_into()?))), + PrimitiveType::Float => Ok(Value::Float(OrderedFloat(f32::from_le_bytes( + bytes.try_into()?, + )))), + PrimitiveType::Double => Ok(Value::Double(OrderedFloat(f64::from_le_bytes( + bytes.try_into()?, + )))), + PrimitiveType::Date => Ok(Value::Date(date::days_to_date(i32::from_le_bytes( + bytes.try_into()?, + ))?)), + PrimitiveType::Time => Ok(Value::Time(time::microseconds_to_time( + i64::from_le_bytes(bytes.try_into()?), + )?)), + PrimitiveType::Timestamp => Ok(Value::Timestamp( + timestamp::microseconds_to_datetime(i64::from_le_bytes(bytes.try_into()?))?, + )), + PrimitiveType::Timestamptz => Ok(Value::TimestampTZ( + timestamptz::microseconds_to_datetimetz(i64::from_le_bytes(bytes.try_into()?))?, + )), + PrimitiveType::String => Ok(Value::String(std::str::from_utf8(bytes)?.to_string())), + PrimitiveType::Uuid => Ok(Value::UUID(Uuid::from_u128(u128::from_be_bytes( + bytes.try_into()?, + )))), + PrimitiveType::Fixed(len) => Ok(Value::Fixed(*len as usize, Vec::from(bytes))), + PrimitiveType::Binary => Ok(Value::Binary(Vec::from(bytes))), + _ => Err(Error::new( + crate::ErrorKind::DataInvalid, + "Converting bytes to decimal is not supported.", + )), + }, + _ => Err(Error::new( + crate::ErrorKind::DataInvalid, + "Converting bytes to non-primitive types is not supported.", + )), + } + } + + /// Get datatype of value + pub fn datatype(&self) -> Type { + match self { + Value::Boolean(_) => Type::Primitive(PrimitiveType::Boolean), + Value::Int(_) => Type::Primitive(PrimitiveType::Int), + Value::Long(_) => Type::Primitive(PrimitiveType::Long), + Value::Float(_) => Type::Primitive(PrimitiveType::Float), + Value::Double(_) => Type::Primitive(PrimitiveType::Double), + Value::Date(_) => Type::Primitive(PrimitiveType::Date), + Value::Time(_) => Type::Primitive(PrimitiveType::Time), + Value::Timestamp(_) => Type::Primitive(PrimitiveType::Timestamp), + Value::TimestampTZ(_) => Type::Primitive(PrimitiveType::Timestamptz), + Value::Fixed(len, _) => Type::Primitive(PrimitiveType::Fixed(*len as u64)), + Value::Binary(_) => Type::Primitive(PrimitiveType::Binary), + Value::String(_) => Type::Primitive(PrimitiveType::String), + Value::UUID(_) => Type::Primitive(PrimitiveType::Uuid), + Value::Decimal(dec) => Type::Primitive(PrimitiveType::Decimal { + precision: 38, + scale: dec.scale(), + }), + _ => unimplemented!(), + } + } + + /// Convert Value to the any type + pub fn into_any(self) -> Box<dyn Any> { + match self { + Value::Boolean(any) => Box::new(any), + Value::Int(any) => Box::new(any), + Value::Long(any) => Box::new(any), + Value::Float(any) => Box::new(any), + Value::Double(any) => Box::new(any), + Value::Date(any) => Box::new(any), + Value::Time(any) => Box::new(any), + Value::Timestamp(any) => Box::new(any), + Value::TimestampTZ(any) => Box::new(any), + Value::Fixed(_, any) => Box::new(any), + Value::Binary(any) => Box::new(any), + Value::String(any) => Box::new(any), + Value::UUID(any) => Box::new(any), + Value::Decimal(any) => Box::new(any), + _ => unimplemented!(), + } + } +} + +mod float { + use ordered_float::OrderedFloat; + use serde::{Deserialize, Deserializer, Serialize, Serializer}; + + pub fn serialize<S>(value: &OrderedFloat<f32>, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + value.serialize(serializer) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result<OrderedFloat<f32>, D::Error> + where + D: Deserializer<'de>, + { + f32::deserialize(deserializer).map(OrderedFloat) + } +} + +mod double { + use ordered_float::OrderedFloat; + use serde::{Deserialize, Deserializer, Serialize, Serializer}; + + pub fn serialize<S>(value: &OrderedFloat<f64>, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + value.serialize(serializer) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result<OrderedFloat<f64>, D::Error> + where + D: Deserializer<'de>, + { + f64::deserialize(deserializer).map(OrderedFloat) + } +} + +mod date { + use chrono::NaiveDate; + use serde::{de, ser, Deserialize, Deserializer, Serialize, Serializer}; + + use crate::Error; + + pub fn serialize<S>(value: &NaiveDate, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + let days = date_to_days(value).map_err(|err| ser::Error::custom(err.to_string()))?; + days.serialize(serializer) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result<NaiveDate, D::Error> + where + D: Deserializer<'de>, + { + let days = i32::deserialize(deserializer)?; + + days_to_date(days).map_err(|err| de::Error::custom(err.to_string())) + } + + pub(crate) fn date_to_days(date: &NaiveDate) -> Result<i32, Error> { + Ok(date + .signed_duration_since(NaiveDate::from_ymd_opt(1970, 0, 0).ok_or(Error::new( + crate::ErrorKind::DataInvalid, + "Failed to get time from midnight", + ))?) + .num_days() as i32) + } + + pub(crate) fn days_to_date(days: i32) -> Result<NaiveDate, Error> { + NaiveDate::from_num_days_from_ce_opt(days).ok_or(Error::new( + crate::ErrorKind::DataInvalid, + "Failed to convert microseconds to time", + )) + } +} + +mod time { + use chrono::NaiveTime; + use serde::{de, ser, Deserialize, Deserializer, Serialize, Serializer}; + + use crate::Error; + + pub fn serialize<S>(value: &NaiveTime, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + let micros = + time_to_microseconds(value).map_err(|err| ser::Error::custom(err.to_string()))?; + micros.serialize(serializer) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result<NaiveTime, D::Error> + where + D: Deserializer<'de>, + { + let micros = i64::deserialize(deserializer)?; + + microseconds_to_time(micros).map_err(|err| de::Error::custom(err.to_string())) + } + + pub(crate) fn time_to_microseconds(time: &NaiveTime) -> Result<i64, Error> { + time.signed_duration_since(NaiveTime::from_num_seconds_from_midnight_opt(0, 0).ok_or( + Error::new( + crate::ErrorKind::DataInvalid, + "Failed to get time from midnight", + ), + )?) + .num_microseconds() + .ok_or(Error::new( + crate::ErrorKind::DataInvalid, + "Failed to convert time to microseconds", + )) + } + + pub(crate) fn microseconds_to_time(micros: i64) -> Result<NaiveTime, Error> { + let (secs, rem) = (micros / 1_000_000, micros % 1_000_000); + + NaiveTime::from_num_seconds_from_midnight_opt(secs as u32, rem as u32 * 1000).ok_or( + Error::new( + crate::ErrorKind::DataInvalid, + "Failed to convert microseconds to time", + ), + ) + } +} + +mod timestamp { + use chrono::NaiveDateTime; + use serde::{de, ser, Deserialize, Deserializer, Serialize, Serializer}; + + use crate::Error; + + pub fn serialize<S>(value: &NaiveDateTime, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + let micros = + datetime_to_microseconds(value).map_err(|err| ser::Error::custom(err.to_string()))?; + micros.serialize(serializer) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result<NaiveDateTime, D::Error> + where + D: Deserializer<'de>, + { + let micros = i64::deserialize(deserializer)?; + + microseconds_to_datetime(micros).map_err(|err| de::Error::custom(err.to_string())) + } + + pub(crate) fn datetime_to_microseconds(time: &NaiveDateTime) -> Result<i64, Error> { + time.signed_duration_since(NaiveDateTime::from_timestamp_opt(0, 0).ok_or(Error::new( + crate::ErrorKind::DataInvalid, + "Failed to get time from midnight", + ))?) + .num_microseconds() + .ok_or(Error::new( + crate::ErrorKind::DataInvalid, + "Failed to convert time to microseconds", + )) + } + + pub(crate) fn microseconds_to_datetime(micros: i64) -> Result<NaiveDateTime, Error> { + let (secs, rem) = (micros / 1_000_000, micros % 1_000_000); + + NaiveDateTime::from_timestamp_opt(secs, rem as u32 * 1000).ok_or(Error::new( + crate::ErrorKind::DataInvalid, + "Failed to convert microseconds to time", + )) + } +} + +mod timestamptz { + use chrono::{DateTime, NaiveDateTime, Utc}; + use serde::{de, ser, Deserialize, Deserializer, Serialize, Serializer}; + + use crate::Error; + + pub fn serialize<S>(value: &DateTime<Utc>, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + let micros = + datetimetz_to_microseconds(value).map_err(|err| ser::Error::custom(err.to_string()))?; + micros.serialize(serializer) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result<DateTime<Utc>, D::Error> + where + D: Deserializer<'de>, + { + let micros = i64::deserialize(deserializer)?; + + microseconds_to_datetimetz(micros).map_err(|err| de::Error::custom(err.to_string())) + } + + pub(crate) fn datetimetz_to_microseconds(time: &DateTime<Utc>) -> Result<i64, Error> { + time.signed_duration_since(DateTime::<Utc>::from_utc( + NaiveDateTime::from_timestamp_opt(0, 0).ok_or(Error::new( + crate::ErrorKind::DataInvalid, + "Failed to get time from midnight", + ))?, + Utc, + )) + .num_microseconds() + .ok_or(Error::new( + crate::ErrorKind::DataInvalid, + "Failed to convert time to microseconds", + )) + } + + pub(crate) fn microseconds_to_datetimetz(micros: i64) -> Result<DateTime<Utc>, Error> { + let (secs, rem) = (micros / 1_000_000, micros % 1_000_000); + + Ok(DateTime::<Utc>::from_utc( + NaiveDateTime::from_timestamp_opt(secs, rem as u32 * 1000).ok_or(Error::new( + crate::ErrorKind::DataInvalid, + "Failed to convert microseconds to time", + ))?, + Utc, + )) + } +} + +#[cfg(test)] +mod tests { + + use super::*; + + #[test] + pub fn boolean() { + let input = Value::Boolean(true); + + let raw_schema = r#""boolean""#; + + let schema = apache_avro::Schema::parse_str(raw_schema).unwrap(); + + let mut writer = apache_avro::Writer::new(&schema, Vec::new()); + + writer.append_ser(input.clone()).unwrap(); + + let encoded = writer.into_inner().unwrap(); + + let reader = apache_avro::Reader::new(&*encoded).unwrap(); + + for record in reader { + let result = apache_avro::from_value::<Value>(&record.unwrap()).unwrap(); + assert_eq!(input, result); + } + } + + #[test] + pub fn int() { + let input = Value::Int(42); + + let raw_schema = r#""int""#; + + let schema = apache_avro::Schema::parse_str(raw_schema).unwrap(); + + let mut writer = apache_avro::Writer::new(&schema, Vec::new()); + + writer.append_ser(input.clone()).unwrap(); + + let encoded = writer.into_inner().unwrap(); + + let reader = apache_avro::Reader::new(&*encoded).unwrap(); + + for record in reader { + let result = apache_avro::from_value::<Value>(&record.unwrap()).unwrap(); + assert_eq!(input, result); + } + } + + #[test] + pub fn float() { + let input = Value::Float(OrderedFloat(42.0)); + + let raw_schema = r#""float""#; + + let schema = apache_avro::Schema::parse_str(raw_schema).unwrap(); + + let mut writer = apache_avro::Writer::new(&schema, Vec::new()); + + writer.append_ser(input.clone()).unwrap(); + + let encoded = writer.into_inner().unwrap(); + + let reader = apache_avro::Reader::new(&*encoded).unwrap(); + + for record in reader { + let result = apache_avro::from_value::<Value>(&record.unwrap()).unwrap(); + assert_eq!(input, result); + } + } + + #[test] + pub fn string() { + let input = Value::String("test".to_string()); + + let raw_schema = r#""string""#; + + let schema = apache_avro::Schema::parse_str(raw_schema).unwrap(); + + let mut writer = apache_avro::Writer::new(&schema, Vec::new()); + + writer.append_ser(input.clone()).unwrap(); + + let encoded = writer.into_inner().unwrap(); + + let reader = apache_avro::Reader::new(&*encoded).unwrap(); + + for record in reader { + let result = apache_avro::from_value::<Value>(&record.unwrap()).unwrap(); + assert_eq!(input, result); + } + } + Review Comment: Can you also add tests for the other types? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
