This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 1a186d8 feat: Define schema post order visitor. (#25)
1a186d8 is described below
commit 1a186d86d9d719c937127145d813d59db282ee17
Author: Renjie Liu <[email protected]>
AuthorDate: Thu Aug 10 21:04:44 2023 +0800
feat: Define schema post order visitor. (#25)
* Define schema post order visitor
* Resolve conflicts
---
crates/iceberg/Cargo.toml | 5 +-
crates/iceberg/src/error.rs | 25 +
crates/iceberg/src/spec/datatypes.rs | 102 ++--
crates/iceberg/src/spec/schema.rs | 980 ++++++++++++++++++++++++++++++++++-
crates/iceberg/src/spec/values.rs | 79 +--
5 files changed, 1085 insertions(+), 106 deletions(-)
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index e1b5de2..0b27a56 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -28,7 +28,7 @@ keywords = ["iceberg"]
[dependencies]
apache-avro = "0.15.0"
-serde = "^1.0"
+serde = {version = "^1.0", features = ["rc"]}
serde_bytes = "0.11.8"
serde_json = "^1.0"
serde_derive = "^1.0"
@@ -39,6 +39,9 @@ chrono = "0.4"
uuid = "1.4.1"
ordered-float = "3.7.0"
bitvec = "1.0.1"
+itertools = "0.11"
+bimap = "0.6"
+
[dev-dependencies]
pretty_assertions = "1.4.0"
diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs
index 6cb41db..b2a4ba8 100644
--- a/crates/iceberg/src/error.rs
+++ b/crates/iceberg/src/error.rs
@@ -241,6 +241,12 @@ impl Error {
pub fn kind(&self) -> ErrorKind {
self.kind
}
+
+ /// Return error's message.
+ #[inline]
+ pub fn message(&self) -> &str {
+ self.message.as_str()
+ }
}
macro_rules! define_from_err {
@@ -283,6 +289,25 @@ define_from_err!(
"Failed to convert between uuid und iceberg value"
);
+/// Helper macro to check arguments.
+///
+///
+/// Example:
+///
+/// Following example check `a > 0`, otherwise returns an error.
+/// ```ignore
+/// use iceberg::check;
+/// ensure_data_valid!(a > 0, "{} is not positive.", a);
+/// ```
+#[macro_export]
+macro_rules! ensure_data_valid {
+ ($cond: expr, $fmt: literal, $($arg:tt)*) => {
+ if !$cond {
+ return
Err($crate::error::Error::new($crate::error::ErrorKind::DataInvalid,
format!($fmt, $($arg)*)))
+ }
+ };
+}
+
#[cfg(test)]
mod tests {
use anyhow::anyhow;
diff --git a/crates/iceberg/src/spec/datatypes.rs
b/crates/iceberg/src/spec/datatypes.rs
index 5750ab9..9b4c986 100644
--- a/crates/iceberg/src/spec/datatypes.rs
+++ b/crates/iceberg/src/spec/datatypes.rs
@@ -22,13 +22,13 @@ use ::serde::de::{MapAccess, Visitor};
use serde::de::{Error, IntoDeserializer};
use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
use std::cell::OnceCell;
-use std::slice::Iter;
+use std::sync::Arc;
use std::{collections::HashMap, fmt, ops::Index};
/// Field name for list type.
-const LIST_FILED_NAME: &str = "element";
-const MAP_KEY_FIELD_NAME: &str = "key";
-const MAP_VALUE_FIELD_NAME: &str = "value";
+pub(crate) const LIST_FILED_NAME: &str = "element";
+pub(crate) const MAP_KEY_FIELD_NAME: &str = "key";
+pub(crate) const MAP_VALUE_FIELD_NAME: &str = "value";
#[derive(Debug, PartialEq, Eq, Clone)]
/// All data types are either primitives or nested types, which are maps,
lists, or structs.
@@ -54,6 +54,20 @@ impl fmt::Display for Type {
}
}
+impl Type {
+ /// Whether the type is primitive type.
+ #[inline(always)]
+ pub fn is_primitive(&self) -> bool {
+ matches!(self, Type::Primitive(_))
+ }
+
+ /// Whether the type is struct type.
+ #[inline(always)]
+ pub fn is_struct(&self) -> bool {
+ matches!(self, Type::Struct(_))
+ }
+}
+
/// Primitive data types
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(rename_all = "lowercase", remote = "Self")]
@@ -218,7 +232,7 @@ impl fmt::Display for PrimitiveType {
#[serde(rename = "struct", tag = "type")]
pub struct StructType {
/// Struct fields
- fields: Vec<NestedField>,
+ fields: Vec<NestedFieldRef>,
/// Lookup for index by field id
#[serde(skip_serializing)]
id_lookup: OnceCell<HashMap<i32, usize>>,
@@ -261,7 +275,7 @@ impl<'de> Deserialize<'de> for StructType {
}
}
}
- let fields: Vec<NestedField> =
+ let fields: Vec<NestedFieldRef> =
fields.ok_or_else(|| de::Error::missing_field("fields"))?;
Ok(StructType::new(fields))
@@ -275,14 +289,14 @@ impl<'de> Deserialize<'de> for StructType {
impl StructType {
/// Creates a struct type with the given fields.
- pub fn new(fields: Vec<NestedField>) -> Self {
+ pub fn new(fields: Vec<NestedFieldRef>) -> Self {
Self {
fields,
id_lookup: OnceCell::default(),
}
}
/// Get struct field with certain id
- pub fn field_by_id(&self, id: i32) -> Option<&NestedField> {
+ pub fn field_by_id(&self, id: i32) -> Option<&NestedFieldRef> {
self.field_id_to_index(id).map(|idx| &self.fields[idx])
}
@@ -294,9 +308,10 @@ impl StructType {
.get(&field_id)
.copied()
}
- /// Returns an iteratorr over the struct fields
- pub fn iter(&self) -> Iter<NestedField> {
- self.fields.iter()
+
+ /// Get fields.
+ pub fn fields(&self) -> &[NestedFieldRef] {
+ &self.fields
}
}
@@ -352,6 +367,9 @@ pub struct NestedField {
pub write_default: Option<String>,
}
+/// Reference to nested field.
+pub type NestedFieldRef = Arc<NestedField>;
+
impl NestedField {
/// Construct a required field.
pub fn required(id: i32, name: impl ToString, field_type: Type) -> Self {
@@ -443,13 +461,15 @@ impl fmt::Display for NestedField {
/// Elements can be either optional or required. Element types may be any type.
pub struct ListType {
/// Element field of list type.
- pub element_field: NestedField,
+ pub element_field: NestedFieldRef,
}
/// Module for type serialization/deserialization.
pub(super) mod _serde {
use crate::spec::datatypes::Type::Map;
- use crate::spec::datatypes::{ListType, MapType, NestedField,
PrimitiveType, StructType, Type};
+ use crate::spec::datatypes::{
+ ListType, MapType, NestedField, NestedFieldRef, PrimitiveType,
StructType, Type,
+ };
use serde_derive::{Deserialize, Serialize};
use std::borrow::Cow;
@@ -466,7 +486,7 @@ pub(super) mod _serde {
},
Struct {
r#type: String,
- fields: Cow<'a, Vec<NestedField>>,
+ fields: Cow<'a, Vec<NestedFieldRef>>,
},
#[serde(rename_all = "kebab-case")]
Map {
@@ -493,7 +513,8 @@ pub(super) mod _serde {
element_id,
element.into_owned(),
element_required,
- ),
+ )
+ .into(),
}),
SerdeType::Map {
r#type: _,
@@ -503,12 +524,13 @@ pub(super) mod _serde {
value_required,
value,
} => Map(MapType {
- key_field: NestedField::map_key_element(key_id,
key.into_owned()),
+ key_field: NestedField::map_key_element(key_id,
key.into_owned()).into(),
value_field: NestedField::map_value_element(
value_id,
value.into_owned(),
value_required,
- ),
+ )
+ .into(),
}),
SerdeType::Struct { r#type: _, fields } => {
Self::Struct(StructType::new(fields.into_owned()))
@@ -552,9 +574,9 @@ pub(super) mod _serde {
/// Both map keys and map values may be any type, including nested types.
pub struct MapType {
/// Field for key.
- pub key_field: NestedField,
+ pub key_field: NestedFieldRef,
/// Field for value.
- pub value_field: NestedField,
+ pub value_field: NestedFieldRef,
}
#[cfg(test)]
@@ -598,7 +620,8 @@ mod tests {
precision: 9,
scale: 2,
}),
- )],
+ )
+ .into()],
id_lookup: OnceCell::default(),
}),
)
@@ -627,7 +650,8 @@ mod tests {
1,
"id",
Type::Primitive(PrimitiveType::Fixed(8)),
- )],
+ )
+ .into()],
id_lookup: OnceCell::default(),
}),
)
@@ -662,8 +686,9 @@ mod tests {
fields: vec![
NestedField::required(1, "id",
Type::Primitive(PrimitiveType::Uuid))
.with_initial_default("0db3e2a8-9d1d-42b9-aa7b-74ebe558dceb")
-
.with_write_default("ec5911be-b0a7-458c-8438-c9a3e53cffae"),
- NestedField::optional(2, "data",
Type::Primitive(PrimitiveType::Int)),
+
.with_write_default("ec5911be-b0a7-458c-8438-c9a3e53cffae")
+ .into(),
+ NestedField::optional(2, "data",
Type::Primitive(PrimitiveType::Int)).into(),
],
id_lookup: HashMap::from([(1, 0), (2, 1)]).into(),
}),
@@ -725,17 +750,21 @@ mod tests {
let struct_type = Type::Struct(StructType::new(vec![
NestedField::required(1, "id",
Type::Primitive(PrimitiveType::Uuid))
.with_initial_default("0db3e2a8-9d1d-42b9-aa7b-74ebe558dceb")
- .with_write_default("ec5911be-b0a7-458c-8438-c9a3e53cffae"),
- NestedField::optional(2, "data",
Type::Primitive(PrimitiveType::Int)),
+ .with_write_default("ec5911be-b0a7-458c-8438-c9a3e53cffae")
+ .into(),
+ NestedField::optional(2, "data",
Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(
3,
"address",
Type::Struct(StructType::new(vec![
- NestedField::required(4, "street",
Type::Primitive(PrimitiveType::String)),
- NestedField::optional(5, "province",
Type::Primitive(PrimitiveType::String)),
- NestedField::required(6, "zip",
Type::Primitive(PrimitiveType::Int)),
+ NestedField::required(4, "street",
Type::Primitive(PrimitiveType::String))
+ .into(),
+ NestedField::optional(5, "province",
Type::Primitive(PrimitiveType::String))
+ .into(),
+ NestedField::required(6, "zip",
Type::Primitive(PrimitiveType::Int)).into(),
])),
- ),
+ )
+ .into(),
]));
check_type_serde(record, struct_type)
@@ -759,7 +788,8 @@ mod tests {
3,
Type::Primitive(PrimitiveType::String),
true,
- ),
+ )
+ .into(),
}),
);
}
@@ -780,12 +810,14 @@ mod tests {
check_type_serde(
record,
Type::Map(MapType {
- key_field: NestedField::map_key_element(4,
Type::Primitive(PrimitiveType::String)),
+ key_field: NestedField::map_key_element(4,
Type::Primitive(PrimitiveType::String))
+ .into(),
value_field: NestedField::map_value_element(
5,
Type::Primitive(PrimitiveType::Double),
false,
- ),
+ )
+ .into(),
}),
);
}
@@ -806,12 +838,14 @@ mod tests {
check_type_serde(
record,
Type::Map(MapType {
- key_field: NestedField::map_key_element(4,
Type::Primitive(PrimitiveType::Int)),
+ key_field: NestedField::map_key_element(4,
Type::Primitive(PrimitiveType::Int))
+ .into(),
value_field: NestedField::map_value_element(
5,
Type::Primitive(PrimitiveType::String),
false,
- ),
+ )
+ .into(),
}),
);
}
diff --git a/crates/iceberg/src/spec/schema.rs
b/crates/iceberg/src/spec/schema.rs
index 1e80e7c..e1c6f2c 100644
--- a/crates/iceberg/src/spec/schema.rs
+++ b/crates/iceberg/src/spec/schema.rs
@@ -17,7 +17,17 @@
//! This module defines schema in iceberg.
-use crate::spec::datatypes::{NestedField, StructType};
+use crate::error::Result;
+use crate::spec::datatypes::{
+ ListType, MapType, NestedFieldRef, PrimitiveType, StructType, Type,
LIST_FILED_NAME,
+ MAP_KEY_FIELD_NAME, MAP_VALUE_FIELD_NAME,
+};
+use crate::{ensure_data_valid, Error, ErrorKind};
+use bimap::BiHashMap;
+use itertools::Itertools;
+use once_cell::sync::OnceCell;
+use std::collections::{HashMap, HashSet};
+use std::fmt::{Display, Formatter};
const DEFAULT_SCHEMA_ID: i32 = 0;
@@ -27,17 +37,27 @@ pub struct Schema {
r#struct: StructType,
schema_id: i32,
highest_field_id: i32,
+ identifier_field_ids: HashSet<i32>,
+
+ alias_to_id: BiHashMap<String, i32>,
+ id_to_field: HashMap<i32, NestedFieldRef>,
+
+ name_to_id: HashMap<String, i32>,
+ id_to_name: HashMap<i32, String>,
+ lower_case_name_to_id: OnceCell<HashMap<String, i32>>,
}
/// Schema builder.
pub struct SchemaBuilder {
schema_id: i32,
- fields: Vec<NestedField>,
+ fields: Vec<NestedFieldRef>,
+ alias_to_id: BiHashMap<String, i32>,
+ identifier_field_ids: HashSet<i32>,
}
impl SchemaBuilder {
- /// Add fields to schem builder
- pub fn with_fields(mut self, fields: impl IntoIterator<Item =
NestedField>) -> Self {
+ /// Add fields to schem builder.
+ pub fn with_fields(mut self, fields: impl IntoIterator<Item =
NestedFieldRef>) -> Self {
self.fields.extend(fields.into_iter());
self
}
@@ -48,14 +68,105 @@ impl SchemaBuilder {
self
}
+ /// Set identifier field ids.
+ pub fn with_identifier_field_ids(mut self, ids: impl IntoIterator<Item =
i32>) -> Self {
+ self.identifier_field_ids.extend(ids);
+ self
+ }
+
+ /// Set alias to filed id mapping.
+ pub fn with_alias(mut self, alias_to_id: BiHashMap<String, i32>) -> Self {
+ self.alias_to_id = alias_to_id;
+ self
+ }
+
/// Builds the schema.
- pub fn build(self) -> Schema {
+ pub fn build(self) -> Result<Schema> {
let highest_field_id = self.fields.iter().map(|f|
f.id).max().unwrap_or(0);
- Schema {
- r#struct: StructType::new(self.fields),
+
+ let r#struct = StructType::new(self.fields);
+ let id_to_field = index_by_id(&r#struct)?;
+
+ Self::validate_identifier_ids(
+ &r#struct,
+ &id_to_field,
+ self.identifier_field_ids.iter().copied(),
+ )?;
+
+ let (name_to_id, id_to_name) = {
+ let mut index = IndexByName::default();
+ visit_struct(&r#struct, &mut index)?;
+ index.indexes()
+ };
+
+ Ok(Schema {
+ r#struct,
schema_id: self.schema_id,
highest_field_id,
+ identifier_field_ids: self.identifier_field_ids,
+
+ alias_to_id: self.alias_to_id,
+ id_to_field,
+
+ name_to_id,
+ id_to_name,
+ lower_case_name_to_id: OnceCell::default(),
+ })
+ }
+
+ fn validate_identifier_ids(
+ r#struct: &StructType,
+ id_to_field: &HashMap<i32, NestedFieldRef>,
+ identifier_field_ids: impl Iterator<Item = i32>,
+ ) -> Result<()> {
+ let id_to_parent = index_parents(r#struct);
+ for identifier_field_id in identifier_field_ids {
+ let field = id_to_field.get(&identifier_field_id).ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Cannot add identifier field {identifier_field_id}:
field does not exist"
+ ),
+ )
+ })?;
+ ensure_data_valid!(
+ field.required,
+ "Cannot add identifier field: {} is an optional field",
+ field.name
+ );
+ if let Type::Primitive(p) = field.field_type.as_ref() {
+ ensure_data_valid!(
+ !matches!(p, PrimitiveType::Double | PrimitiveType::Float),
+ "Cannot add identifier field {}: cannot be a float or
double type",
+ field.name
+ );
+ } else {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Cannot add field {} as an identifier field: not a
primitive type field",
+ field.name
+ ),
+ ));
+ }
+
+ let mut cur_field_id = identifier_field_id;
+ while let Some(parent) = id_to_parent.get(&cur_field_id) {
+ let parent_field = id_to_field
+ .get(parent)
+ .expect("Field id should not disappear.");
+ ensure_data_valid!(
+ parent_field.field_type.is_struct(),
+ "Cannot add field {} as an identifier field: must not be
nested in {:?}",
+ field.name,
+ parent_field
+ );
+ ensure_data_valid!(parent_field.required, "Cannot add field {}
as an identifier field: must not be nested in an optional field {}",
field.name, parent_field);
+ cur_field_id = *parent;
+ }
}
+
+ Ok(())
}
}
@@ -65,12 +176,30 @@ impl Schema {
SchemaBuilder {
schema_id: DEFAULT_SCHEMA_ID,
fields: vec![],
+ identifier_field_ids: HashSet::default(),
+ alias_to_id: BiHashMap::default(),
}
}
/// Get field by field id.
- pub fn field_by_id(&self, field_id: i32) -> Option<&NestedField> {
- self.r#struct.field_by_id(field_id)
+ pub fn field_by_id(&self, field_id: i32) -> Option<&NestedFieldRef> {
+ self.id_to_field.get(&field_id)
+ }
+
+ /// Get field by field name.
+ ///
+ /// Both full name and short name could work here.
+ pub fn field_by_name(&self, field_name: &str) -> Option<&NestedFieldRef> {
+ self.name_to_id
+ .get(field_name)
+ .and_then(|id| self.field_by_id(*id))
+ }
+
+ /// Get field by alias.
+ pub fn field_by_alias(&self, alias: &str) -> Option<&NestedFieldRef> {
+ self.alias_to_id
+ .get_by_left(alias)
+ .and_then(|id| self.field_by_id(*id))
}
/// Returns [`highest_field_id`].
@@ -84,23 +213,424 @@ impl Schema {
pub fn schema_id(&self) -> i32 {
self.schema_id
}
+
+ /// Returns [`r#struct`].
+ #[inline]
+ pub fn as_struct(&self) -> &StructType {
+ &self.r#struct
+ }
+
+ /// Get field id by full name.
+ pub fn field_id_by_name(&self, name: &str) -> Option<i32> {
+ self.name_to_id.get(name).copied()
+ }
+
+ /// Get field id by full name.
+ pub fn name_by_field_id(&self, field_id: i32) -> Option<&str> {
+ self.id_to_name.get(&field_id).map(String::as_str)
+ }
+}
+
+impl Display for Schema {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ writeln!(f, "table {{")?;
+ for field in self.as_struct().fields() {
+ writeln!(f, " {}", field)?;
+ }
+ writeln!(f, "}}")
+ }
+}
+
+/// A post order schema visitor.
+///
+/// For order of methods called, please refer to [`visit_schema`].
+pub trait SchemaVisitor {
+ /// Return type of this visitor.
+ type T;
+
+ /// Called before struct field.
+ fn before_struct_field(&mut self, _field: &NestedFieldRef) -> Result<()> {
+ Ok(())
+ }
+ /// Called after struct field.
+ fn after_struct_field(&mut self, _field: &NestedFieldRef) -> Result<()> {
+ Ok(())
+ }
+ /// Called before list field.
+ fn before_list_element(&mut self, _field: &NestedFieldRef) -> Result<()> {
+ Ok(())
+ }
+ /// Called after list field.
+ fn after_list_element(&mut self, _field: &NestedFieldRef) -> Result<()> {
+ Ok(())
+ }
+ /// Called before map key field.
+ fn before_map_key(&mut self, _field: &NestedFieldRef) -> Result<()> {
+ Ok(())
+ }
+ /// Called after map key field.
+ fn after_map_key(&mut self, _field: &NestedFieldRef) -> Result<()> {
+ Ok(())
+ }
+ /// Called before map value field.
+ fn before_map_value(&mut self, _field: &NestedFieldRef) -> Result<()> {
+ Ok(())
+ }
+ /// Called after map value field.
+ fn after_map_value(&mut self, _field: &NestedFieldRef) -> Result<()> {
+ Ok(())
+ }
+
+ /// Called after schema's type visited.
+ fn schema(&mut self, schema: &Schema, value: Self::T) -> Result<Self::T>;
+ /// Called after struct's field type visited.
+ fn field(&mut self, field: &NestedFieldRef, value: Self::T) ->
Result<Self::T>;
+ /// Called after struct's fields visited.
+ fn r#struct(&mut self, r#struct: &StructType, results: Vec<Self::T>) ->
Result<Self::T>;
+ /// Called after list fields visited.
+ fn list(&mut self, list: &ListType, value: Self::T) -> Result<Self::T>;
+ /// Called after map's key and value fields visited.
+ fn map(&mut self, map: &MapType, key_value: Self::T, value: Self::T) ->
Result<Self::T>;
+ /// Called when see a primitive type.
+ fn primitive(&mut self, p: &PrimitiveType) -> Result<Self::T>;
+}
+
+/// Visiting a type in post order.
+pub fn visit_type<V: SchemaVisitor>(r#type: &Type, visitor: &mut V) ->
Result<V::T> {
+ match r#type {
+ Type::Primitive(p) => visitor.primitive(p),
+ Type::List(list) => {
+ visitor.before_list_element(&list.element_field)?;
+ let value = visit_type(&list.element_field.field_type, visitor)?;
+ visitor.after_list_element(&list.element_field)?;
+ visitor.list(list, value)
+ }
+ Type::Map(map) => {
+ let key_result = {
+ visitor.before_map_key(&map.key_field)?;
+ let ret = visit_type(&map.key_field.field_type, visitor)?;
+ visitor.after_map_key(&map.key_field)?;
+ ret
+ };
+
+ let value_result = {
+ visitor.before_map_value(&map.value_field)?;
+ let ret = visit_type(&map.value_field.field_type, visitor)?;
+ visitor.after_map_value(&map.value_field)?;
+ ret
+ };
+
+ visitor.map(map, key_result, value_result)
+ }
+ Type::Struct(s) => visit_struct(s, visitor),
+ }
+}
+
+/// Visit struct type in post order.
+pub fn visit_struct<V: SchemaVisitor>(s: &StructType, visitor: &mut V) ->
Result<V::T> {
+ let mut results = Vec::with_capacity(s.fields().len());
+ for field in s.fields() {
+ visitor.before_struct_field(field)?;
+ let result = visit_type(&field.field_type, visitor)?;
+ visitor.after_struct_field(field)?;
+ let result = visitor.field(field, result)?;
+ results.push(result);
+ }
+
+ visitor.r#struct(s, results)
+}
+
+/// Visit schema in post order.
+pub fn visit_schema<V: SchemaVisitor>(schema: &Schema, visitor: &mut V) ->
Result<V::T> {
+ let result = visit_struct(&schema.r#struct, visitor)?;
+ visitor.schema(schema, result)
+}
+
+/// Creates an field id to field map.
+pub fn index_by_id(r#struct: &StructType) -> Result<HashMap<i32,
NestedFieldRef>> {
+ struct IndexById(HashMap<i32, NestedFieldRef>);
+
+ impl SchemaVisitor for IndexById {
+ type T = ();
+
+ fn schema(&mut self, _schema: &Schema, _value: ()) -> Result<()> {
+ Ok(())
+ }
+
+ fn field(&mut self, field: &NestedFieldRef, _value: ()) -> Result<()> {
+ self.0.insert(field.id, field.clone());
+ Ok(())
+ }
+
+ fn r#struct(&mut self, _struct: &StructType, _results: Vec<Self::T>)
-> Result<Self::T> {
+ Ok(())
+ }
+
+ fn list(&mut self, list: &ListType, _value: Self::T) ->
Result<Self::T> {
+ self.0
+ .insert(list.element_field.id, list.element_field.clone());
+ Ok(())
+ }
+
+ fn map(&mut self, map: &MapType, _key_value: Self::T, _value: Self::T)
-> Result<Self::T> {
+ self.0.insert(map.key_field.id, map.key_field.clone());
+ self.0.insert(map.value_field.id, map.value_field.clone());
+ Ok(())
+ }
+
+ fn primitive(&mut self, _: &PrimitiveType) -> Result<Self::T> {
+ Ok(())
+ }
+ }
+
+ let mut index = IndexById(HashMap::new());
+ visit_struct(r#struct, &mut index)?;
+ Ok(index.0)
+}
+
+/// Creates a field id to parent field id map.
+pub fn index_parents(r#struct: &StructType) -> HashMap<i32, i32> {
+ struct IndexByParent {
+ parents: Vec<i32>,
+ result: HashMap<i32, i32>,
+ }
+
+ impl SchemaVisitor for IndexByParent {
+ type T = ();
+
+ fn before_struct_field(&mut self, field: &NestedFieldRef) ->
Result<()> {
+ self.parents.push(field.id);
+ Ok(())
+ }
+
+ fn after_struct_field(&mut self, _field: &NestedFieldRef) ->
Result<()> {
+ self.parents.pop();
+ Ok(())
+ }
+
+ fn before_list_element(&mut self, field: &NestedFieldRef) ->
Result<()> {
+ self.parents.push(field.id);
+ Ok(())
+ }
+
+ fn after_list_element(&mut self, _field: &NestedFieldRef) ->
Result<()> {
+ self.parents.pop();
+ Ok(())
+ }
+
+ fn before_map_key(&mut self, field: &NestedFieldRef) -> Result<()> {
+ self.parents.push(field.id);
+ Ok(())
+ }
+
+ fn after_map_key(&mut self, _field: &NestedFieldRef) -> Result<()> {
+ self.parents.pop();
+ Ok(())
+ }
+
+ fn before_map_value(&mut self, field: &NestedFieldRef) -> Result<()> {
+ self.parents.push(field.id);
+ Ok(())
+ }
+
+ fn after_map_value(&mut self, _field: &NestedFieldRef) -> Result<()> {
+ self.parents.pop();
+ Ok(())
+ }
+
+ fn schema(&mut self, _schema: &Schema, _value: Self::T) ->
Result<Self::T> {
+ Ok(())
+ }
+
+ fn field(&mut self, field: &NestedFieldRef, _value: Self::T) ->
Result<Self::T> {
+ if let Some(parent) = self.parents.last().copied() {
+ self.result.insert(field.id, parent);
+ }
+ Ok(())
+ }
+
+ fn r#struct(&mut self, _struct: &StructType, _results: Vec<Self::T>)
-> Result<Self::T> {
+ Ok(())
+ }
+
+ fn list(&mut self, _list: &ListType, _value: Self::T) ->
Result<Self::T> {
+ Ok(())
+ }
+
+ fn map(&mut self, _map: &MapType, _key_value: Self::T, _value:
Self::T) -> Result<Self::T> {
+ Ok(())
+ }
+
+ fn primitive(&mut self, _p: &PrimitiveType) -> Result<Self::T> {
+ Ok(())
+ }
+ }
+
+ let mut index = IndexByParent {
+ parents: vec![],
+ result: HashMap::new(),
+ };
+ visit_struct(r#struct, &mut index).unwrap();
+ index.result
+}
+
+#[derive(Default)]
+struct IndexByName {
+ // Maybe radix tree is better here?
+ name_to_id: HashMap<String, i32>,
+ short_name_to_id: HashMap<String, i32>,
+
+ field_names: Vec<String>,
+ short_field_names: Vec<String>,
+}
+
+impl IndexByName {
+ fn add_field(&mut self, name: &str, field_id: i32) -> Result<()> {
+ let full_name = self
+ .field_names
+ .iter()
+ .map(String::as_str)
+ .chain(vec![name])
+ .join(".");
+ if let Some(existing_field_id) =
self.name_to_id.get(full_name.as_str()) {
+ return Err(Error::new(ErrorKind::DataInvalid, format!("Invalid
schema: multiple fields for name {full_name}: {field_id} and
{existing_field_id}")));
+ } else {
+ self.name_to_id.insert(full_name, field_id);
+ }
+
+ let full_short_name = self
+ .short_field_names
+ .iter()
+ .map(String::as_str)
+ .chain(vec![name])
+ .join(".");
+ self.short_name_to_id
+ .entry(full_short_name)
+ .or_insert_with(|| field_id);
+ Ok(())
+ }
+
+ /// Returns two indexes: full name to field id, and id to full name.
+ ///
+ /// In the first index, short names are returned.
+ /// In second index, short names are not returned.
+ pub fn indexes(mut self) -> (HashMap<String, i32>, HashMap<i32, String>) {
+ self.short_name_to_id.reserve(self.name_to_id.len());
+ for (name, id) in &self.name_to_id {
+ self.short_name_to_id.insert(name.clone(), *id);
+ }
+
+ let id_to_name = self.name_to_id.into_iter().map(|e| (e.1,
e.0)).collect();
+ (self.short_name_to_id, id_to_name)
+ }
+}
+
+impl SchemaVisitor for IndexByName {
+ type T = ();
+
+ fn before_struct_field(&mut self, field: &NestedFieldRef) -> Result<()> {
+ self.field_names.push(field.name.to_string());
+ self.short_field_names.push(field.name.to_string());
+ Ok(())
+ }
+
+ fn after_struct_field(&mut self, _field: &NestedFieldRef) -> Result<()> {
+ self.field_names.pop();
+ self.short_field_names.pop();
+ Ok(())
+ }
+
+ fn before_list_element(&mut self, field: &NestedFieldRef) -> Result<()> {
+ self.field_names.push(field.name.clone());
+ if !field.field_type.is_struct() {
+ self.short_field_names.push(field.name.to_string());
+ }
+
+ Ok(())
+ }
+
+ fn after_list_element(&mut self, field: &NestedFieldRef) -> Result<()> {
+ self.field_names.pop();
+ if !field.field_type.is_struct() {
+ self.short_field_names.pop();
+ }
+
+ Ok(())
+ }
+
+ fn before_map_key(&mut self, field: &NestedFieldRef) -> Result<()> {
+ self.before_struct_field(field)
+ }
+
+ fn after_map_key(&mut self, field: &NestedFieldRef) -> Result<()> {
+ self.after_struct_field(field)
+ }
+
+ fn before_map_value(&mut self, field: &NestedFieldRef) -> Result<()> {
+ self.field_names.push(field.name.to_string());
+ if !field.field_type.is_struct() {
+ self.short_field_names.push(field.name.to_string());
+ }
+ Ok(())
+ }
+
+ fn after_map_value(&mut self, field: &NestedFieldRef) -> Result<()> {
+ self.field_names.pop();
+ if !field.field_type.is_struct() {
+ self.short_field_names.pop();
+ }
+
+ Ok(())
+ }
+
+ fn schema(&mut self, _schema: &Schema, _value: Self::T) -> Result<Self::T>
{
+ Ok(())
+ }
+
+ fn field(&mut self, field: &NestedFieldRef, _value: Self::T) ->
Result<Self::T> {
+ self.add_field(field.name.as_str(), field.id)
+ }
+
+ fn r#struct(&mut self, _struct: &StructType, _results: Vec<Self::T>) ->
Result<Self::T> {
+ Ok(())
+ }
+
+ fn list(&mut self, list: &ListType, _value: Self::T) -> Result<Self::T> {
+ self.add_field(LIST_FILED_NAME, list.element_field.id)
+ }
+
+ fn map(&mut self, map: &MapType, _key_value: Self::T, _value: Self::T) ->
Result<Self::T> {
+ self.add_field(MAP_KEY_FIELD_NAME, map.key_field.id)?;
+ self.add_field(MAP_VALUE_FIELD_NAME, map.value_field.id)
+ }
+
+ fn primitive(&mut self, _p: &PrimitiveType) -> Result<Self::T> {
+ Ok(())
+ }
}
#[cfg(test)]
mod tests {
- use crate::spec::datatypes::{NestedField, PrimitiveType, Type};
+ use crate::spec::datatypes::Type::{List, Map, Primitive, Struct};
+ use crate::spec::datatypes::{
+ ListType, MapType, NestedField, NestedFieldRef, PrimitiveType,
StructType, Type,
+ };
use crate::spec::schema::Schema;
+ use std::collections::HashMap;
#[test]
fn test_construct_schema() {
- let field1 = NestedField::required(1, "f1",
Type::Primitive(PrimitiveType::Boolean));
- let field2 = NestedField::optional(2, "f2",
Type::Primitive(PrimitiveType::Int));
+ let field1: NestedFieldRef =
+ NestedField::required(1, "f1",
Type::Primitive(PrimitiveType::Boolean)).into();
+ let field2: NestedFieldRef =
+ NestedField::optional(2, "f2",
Type::Primitive(PrimitiveType::Int)).into();
let schema = Schema::builder()
.with_fields(vec![field1.clone()])
.with_fields(vec![field2.clone()])
.with_schema_id(3)
- .build();
+ .build()
+ .unwrap();
assert_eq!(3, schema.schema_id());
assert_eq!(2, schema.highest_field_id());
@@ -108,4 +638,428 @@ mod tests {
assert_eq!(Some(&field2), schema.field_by_id(2));
assert_eq!(None, schema.field_by_id(3));
}
+
+ fn table_schema_simple() -> Schema {
+ Schema::builder()
+ .with_schema_id(1)
+ .with_identifier_field_ids(vec![2])
+ .with_fields(vec![
+ NestedField::optional(1, "foo",
Type::Primitive(PrimitiveType::String)).into(),
+ NestedField::required(2, "bar",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::optional(3, "baz",
Type::Primitive(PrimitiveType::Boolean)).into(),
+ ])
+ .build()
+ .unwrap()
+ }
+
+ fn table_schema_nested() -> Schema {
+ Schema::builder()
+ .with_schema_id(1)
+ .with_identifier_field_ids(vec![2])
+ .with_fields(vec![
+ NestedField::optional(1, "foo",
Type::Primitive(PrimitiveType::String)).into(),
+ NestedField::required(2, "bar",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::optional(3, "baz",
Type::Primitive(PrimitiveType::Boolean)).into(),
+ NestedField::required(
+ 4,
+ "qux",
+ Type::List(ListType {
+ element_field: NestedField::list_element(
+ 5,
+ Type::Primitive(PrimitiveType::String),
+ true,
+ )
+ .into(),
+ }),
+ )
+ .into(),
+ NestedField::required(
+ 6,
+ "quux",
+ Type::Map(MapType {
+ key_field: NestedField::map_key_element(
+ 7,
+ Type::Primitive(PrimitiveType::String),
+ )
+ .into(),
+ value_field: NestedField::map_value_element(
+ 8,
+ Type::Map(MapType {
+ key_field: NestedField::map_key_element(
+ 9,
+ Type::Primitive(PrimitiveType::String),
+ )
+ .into(),
+ value_field: NestedField::map_value_element(
+ 10,
+ Type::Primitive(PrimitiveType::Int),
+ true,
+ )
+ .into(),
+ }),
+ true,
+ )
+ .into(),
+ }),
+ )
+ .into(),
+ NestedField::required(
+ 11,
+ "location",
+ Type::List(ListType {
+ element_field: NestedField::list_element(
+ 12,
+ Type::Struct(StructType::new(vec![
+ NestedField::optional(
+ 13,
+ "latitude",
+ Type::Primitive(PrimitiveType::Float),
+ )
+ .into(),
+ NestedField::optional(
+ 14,
+ "longitude",
+ Type::Primitive(PrimitiveType::Float),
+ )
+ .into(),
+ ])),
+ true,
+ )
+ .into(),
+ }),
+ )
+ .into(),
+ NestedField::optional(
+ 15,
+ "person",
+ Type::Struct(StructType::new(vec![
+ NestedField::optional(16, "name",
Type::Primitive(PrimitiveType::String))
+ .into(),
+ NestedField::required(17, "age",
Type::Primitive(PrimitiveType::Int))
+ .into(),
+ ])),
+ )
+ .into(),
+ ])
+ .build()
+ .unwrap()
+ }
+
+ #[test]
+ fn test_schema_display() {
+ let expected_str = r#"
+table {
+ 1: foo: optional string
+ 2: bar: required int
+ 3: baz: optional boolean
+}
+"#;
+
+ assert_eq!(expected_str, format!("\n{}", table_schema_simple()));
+ }
+
+ #[test]
+ fn test_schema_build_failed_on_duplicate_names() {
+ let ret = Schema::builder()
+ .with_schema_id(1)
+ .with_identifier_field_ids(vec![1])
+ .with_fields(vec![
+ NestedField::required(1, "foo",
Primitive(PrimitiveType::String)).into(),
+ NestedField::required(2, "bar",
Primitive(PrimitiveType::Int)).into(),
+ NestedField::optional(3, "baz",
Primitive(PrimitiveType::Boolean)).into(),
+ NestedField::optional(4, "baz",
Primitive(PrimitiveType::Boolean)).into(),
+ ])
+ .build();
+
+ assert!(ret
+ .unwrap_err()
+ .message()
+ .contains("Invalid schema: multiple fields for name baz"));
+ }
+
+ #[test]
+ fn test_schema_index_by_name() {
+ let expected_name_to_id = HashMap::from(
+ [
+ ("foo", 1),
+ ("bar", 2),
+ ("baz", 3),
+ ("qux", 4),
+ ("qux.element", 5),
+ ("quux", 6),
+ ("quux.key", 7),
+ ("quux.value", 8),
+ ("quux.value.key", 9),
+ ("quux.value.value", 10),
+ ("location", 11),
+ ("location.element", 12),
+ ("location.element.latitude", 13),
+ ("location.element.longitude", 14),
+ ("location.latitude", 13),
+ ("location.longitude", 14),
+ ("person", 15),
+ ("person.name", 16),
+ ("person.age", 17),
+ ]
+ .map(|e| (e.0.to_string(), e.1)),
+ );
+
+ let schema = table_schema_nested();
+ assert_eq!(&expected_name_to_id, &schema.name_to_id);
+ }
+
+ #[test]
+ fn test_schema_find_column_name() {
+ let expected_column_name = HashMap::from([
+ (1, "foo"),
+ (2, "bar"),
+ (3, "baz"),
+ (4, "qux"),
+ (5, "qux.element"),
+ (6, "quux"),
+ (7, "quux.key"),
+ (8, "quux.value"),
+ (9, "quux.value.key"),
+ (10, "quux.value.value"),
+ (11, "location"),
+ (12, "location.element"),
+ (13, "location.element.latitude"),
+ (14, "location.element.longitude"),
+ ]);
+
+ let schema = table_schema_nested();
+ for (id, name) in expected_column_name {
+ assert_eq!(
+ Some(name),
+ schema.name_by_field_id(id),
+ "Column name for field id {} not match.",
+ id
+ );
+ }
+ }
+
+ #[test]
+ fn test_schema_find_column_name_not_found() {
+ let schema = table_schema_nested();
+
+ assert!(schema.name_by_field_id(99).is_none());
+ }
+
+ #[test]
+ fn test_schema_find_column_name_by_id_simple() {
+ let expected_id_to_name = HashMap::from([(1, "foo"), (2, "bar"), (3,
"baz")]);
+
+ let schema = table_schema_simple();
+
+ for (id, name) in expected_id_to_name {
+ assert_eq!(
+ Some(name),
+ schema.name_by_field_id(id),
+ "Column name for field id {} not match.",
+ id
+ );
+ }
+ }
+
+ #[test]
+ fn test_schema_find_simple() {
+ let schema = table_schema_simple();
+
+ assert_eq!(
+ Some(schema.r#struct.fields()[0].clone()),
+ schema.field_by_id(1).cloned()
+ );
+ assert_eq!(
+ Some(schema.r#struct.fields()[1].clone()),
+ schema.field_by_id(2).cloned()
+ );
+ assert_eq!(
+ Some(schema.r#struct.fields()[2].clone()),
+ schema.field_by_id(3).cloned()
+ );
+
+ assert!(schema.field_by_id(4).is_none());
+ assert!(schema.field_by_name("non exist").is_none());
+ }
+
+ #[test]
+ fn test_schema_find_nested() {
+ let expected_id_to_field: HashMap<i32, NestedField> = HashMap::from([
+ (
+ 1,
+ NestedField::optional(1, "foo",
Primitive(PrimitiveType::String)),
+ ),
+ (
+ 2,
+ NestedField::required(2, "bar", Primitive(PrimitiveType::Int)),
+ ),
+ (
+ 3,
+ NestedField::optional(3, "baz",
Primitive(PrimitiveType::Boolean)),
+ ),
+ (
+ 4,
+ NestedField::required(
+ 4,
+ "qux",
+ Type::List(ListType {
+ element_field: NestedField::list_element(
+ 5,
+ Type::Primitive(PrimitiveType::String),
+ true,
+ )
+ .into(),
+ }),
+ ),
+ ),
+ (
+ 5,
+ NestedField::required(5, "element",
Primitive(PrimitiveType::String)),
+ ),
+ (
+ 6,
+ NestedField::required(
+ 6,
+ "quux",
+ Map(MapType {
+ key_field: NestedField::map_key_element(
+ 7,
+ Primitive(PrimitiveType::String),
+ )
+ .into(),
+ value_field: NestedField::map_value_element(
+ 8,
+ Map(MapType {
+ key_field: NestedField::map_key_element(
+ 9,
+ Primitive(PrimitiveType::String),
+ )
+ .into(),
+ value_field: NestedField::map_value_element(
+ 10,
+ Primitive(PrimitiveType::Int),
+ true,
+ )
+ .into(),
+ }),
+ true,
+ )
+ .into(),
+ }),
+ ),
+ ),
+ (
+ 7,
+ NestedField::required(7, "key",
Primitive(PrimitiveType::String)),
+ ),
+ (
+ 8,
+ NestedField::required(
+ 8,
+ "value",
+ Map(MapType {
+ key_field: NestedField::map_key_element(
+ 9,
+ Primitive(PrimitiveType::String),
+ )
+ .into(),
+ value_field: NestedField::map_value_element(
+ 10,
+ Primitive(PrimitiveType::Int),
+ true,
+ )
+ .into(),
+ }),
+ ),
+ ),
+ (
+ 9,
+ NestedField::required(9, "key",
Primitive(PrimitiveType::String)),
+ ),
+ (
+ 10,
+ NestedField::required(10, "value",
Primitive(PrimitiveType::Int)),
+ ),
+ (
+ 11,
+ NestedField::required(
+ 11,
+ "location",
+ List(ListType {
+ element_field: NestedField::list_element(
+ 12,
+ Struct(StructType::new(vec![
+ NestedField::optional(
+ 13,
+ "latitude",
+ Primitive(PrimitiveType::Float),
+ )
+ .into(),
+ NestedField::optional(
+ 14,
+ "longitude",
+ Primitive(PrimitiveType::Float),
+ )
+ .into(),
+ ])),
+ true,
+ )
+ .into(),
+ }),
+ ),
+ ),
+ (
+ 12,
+ NestedField::list_element(
+ 12,
+ Struct(StructType::new(vec![
+ NestedField::optional(13, "latitude",
Primitive(PrimitiveType::Float))
+ .into(),
+ NestedField::optional(14, "longitude",
Primitive(PrimitiveType::Float))
+ .into(),
+ ])),
+ true,
+ ),
+ ),
+ (
+ 13,
+ NestedField::optional(13, "latitude",
Primitive(PrimitiveType::Float)),
+ ),
+ (
+ 14,
+ NestedField::optional(14, "longitude",
Primitive(PrimitiveType::Float)),
+ ),
+ (
+ 15,
+ NestedField::optional(
+ 15,
+ "person",
+ Type::Struct(StructType::new(vec![
+ NestedField::optional(16, "name",
Type::Primitive(PrimitiveType::String))
+ .into(),
+ NestedField::required(17, "age",
Type::Primitive(PrimitiveType::Int))
+ .into(),
+ ])),
+ ),
+ ),
+ (
+ 16,
+ NestedField::optional(16, "name",
Type::Primitive(PrimitiveType::String)),
+ ),
+ (
+ 17,
+ NestedField::required(17, "age",
Type::Primitive(PrimitiveType::Int)),
+ ),
+ ]);
+
+ let schema = table_schema_nested();
+ for (id, field) in expected_id_to_field {
+ assert_eq!(
+ Some(&field),
+ schema.field_by_id(id).map(|f| f.as_ref()),
+ "Field for {} not match.",
+ id
+ );
+ }
+ }
}
diff --git a/crates/iceberg/src/spec/values.rs
b/crates/iceberg/src/spec/values.rs
index 771ffd8..5d81f95 100644
--- a/crates/iceberg/src/spec/values.rs
+++ b/crates/iceberg/src/spec/values.rs
@@ -407,8 +407,8 @@ impl Literal {
},
Type::Struct(schema) => {
if let JsonValue::Object(mut object) = value {
-
Ok(Some(Literal::Struct(Struct::from_iter(schema.iter().map(
- |field| {
+ Ok(Some(Literal::Struct(Struct::from_iter(
+ schema.fields().iter().map(|field| {
(
field.id,
object.remove(&field.id.to_string()).and_then(|value| {
@@ -423,8 +423,8 @@ impl Literal {
}),
field.name.clone(),
)
- },
- )))))
+ }),
+ ))))
} else {
Err(Error::new(
crate::ErrorKind::DataInvalid,
@@ -796,33 +796,9 @@ mod tests {
(3, None, "address".to_string()),
])),
&Type::Struct(StructType::new(vec![
- NestedField {
- id: 1,
- name: "id".to_string(),
- required: true,
- field_type: Box::new(Type::Primitive(PrimitiveType::Int)),
- doc: None,
- initial_default: None,
- write_default: None,
- },
- NestedField {
- id: 2,
- name: "name".to_string(),
- required: false,
- field_type:
Box::new(Type::Primitive(PrimitiveType::String)),
- doc: None,
- initial_default: None,
- write_default: None,
- },
- NestedField {
- id: 3,
- name: "address".to_string(),
- required: false,
- field_type:
Box::new(Type::Primitive(PrimitiveType::String)),
- doc: None,
- initial_default: None,
- write_default: None,
- },
+ NestedField::required(1, "id",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::optional(2, "name",
Type::Primitive(PrimitiveType::String)).into(),
+ NestedField::optional(3, "address",
Type::Primitive(PrimitiveType::String)).into(),
])),
);
}
@@ -840,15 +816,12 @@ mod tests {
None,
]),
&Type::List(ListType {
- element_field: NestedField {
- id: 0,
- name: "".to_string(),
- required: true,
- field_type: Box::new(Type::Primitive(PrimitiveType::Int)),
- doc: None,
- initial_default: None,
- write_default: None,
- },
+ element_field: NestedField::list_element(
+ 0,
+ Type::Primitive(PrimitiveType::Int),
+ true,
+ )
+ .into(),
}),
);
}
@@ -874,24 +847,14 @@ mod tests {
),
])),
&Type::Map(MapType {
- key_field: NestedField {
- id: 0,
- name: "key".to_string(),
- required: true,
- field_type:
Box::new(Type::Primitive(PrimitiveType::String)),
- doc: None,
- initial_default: None,
- write_default: None,
- },
- value_field: NestedField {
- id: 1,
- name: "value".to_string(),
- required: true,
- field_type: Box::new(Type::Primitive(PrimitiveType::Int)),
- doc: None,
- initial_default: None,
- write_default: None,
- },
+ key_field: NestedField::map_key_element(0,
Type::Primitive(PrimitiveType::String))
+ .into(),
+ value_field: NestedField::map_value_element(
+ 1,
+ Type::Primitive(PrimitiveType::Int),
+ true,
+ )
+ .into(),
}),
);
}