This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 2bc66c43 feat: Reassign field ids for schema (#615)
2bc66c43 is described below
commit 2bc66c43cf0aa7cf79e6c1b82a47c39598616d77
Author: Christian <[email protected]>
AuthorDate: Fri Oct 4 05:22:10 2024 +0200
feat: Reassign field ids for schema (#615)
* Reassign field ids for schema
* Address comments
* Schema ensure unique field ids
* Fix tests with duplicate nested field ids
* Use Schema::builder() for reassigned ids
* Better docs
---
crates/iceberg/src/arrow/schema.rs | 20 +-
crates/iceberg/src/spec/datatypes.rs | 6 +
crates/iceberg/src/spec/schema.rs | 395 ++++++++++++++++++++++++++++++++++-
crates/iceberg/src/spec/values.rs | 22 +-
4 files changed, 411 insertions(+), 32 deletions(-)
diff --git a/crates/iceberg/src/arrow/schema.rs
b/crates/iceberg/src/arrow/schema.rs
index a32c10a2..e73b409c 100644
--- a/crates/iceberg/src/arrow/schema.rs
+++ b/crates/iceberg/src/arrow/schema.rs
@@ -826,8 +826,8 @@ mod tests {
fn arrow_schema_for_arrow_schema_to_schema_test() -> ArrowSchema {
let fields = Fields::from(vec![
- simple_field("key", DataType::Int32, false, "17"),
- simple_field("value", DataType::Utf8, true, "18"),
+ simple_field("key", DataType::Int32, false, "28"),
+ simple_field("value", DataType::Utf8, true, "29"),
]);
let r#struct = DataType::Struct(fields);
@@ -1057,9 +1057,9 @@ mod tests {
"required": true,
"type": {
"type": "map",
- "key-id": 17,
+ "key-id": 28,
"key": "int",
- "value-id": 18,
+ "value-id": 29,
"value-required": false,
"value": "string"
}
@@ -1110,8 +1110,8 @@ mod tests {
fn arrow_schema_for_schema_to_arrow_schema_test() -> ArrowSchema {
let fields = Fields::from(vec![
- simple_field("key", DataType::Int32, false, "17"),
- simple_field("value", DataType::Utf8, true, "18"),
+ simple_field("key", DataType::Int32, false, "28"),
+ simple_field("value", DataType::Utf8, true, "29"),
]);
let r#struct = DataType::Struct(fields);
@@ -1200,7 +1200,7 @@ mod tests {
),
simple_field("map", map, false, "16"),
simple_field("struct", r#struct, false, "17"),
- simple_field("uuid", DataType::FixedSizeBinary(16), false, "26"),
+ simple_field("uuid", DataType::FixedSizeBinary(16), false, "30"),
])
}
@@ -1344,9 +1344,9 @@ mod tests {
"required": true,
"type": {
"type": "map",
- "key-id": 17,
+ "key-id": 28,
"key": "int",
- "value-id": 18,
+ "value-id": 29,
"value-required": false,
"value": "string"
}
@@ -1380,7 +1380,7 @@ mod tests {
}
},
{
- "id":26,
+ "id":30,
"name":"uuid",
"required":true,
"type":"uuid"
diff --git a/crates/iceberg/src/spec/datatypes.rs
b/crates/iceberg/src/spec/datatypes.rs
index d3824596..bce10ad5 100644
--- a/crates/iceberg/src/spec/datatypes.rs
+++ b/crates/iceberg/src/spec/datatypes.rs
@@ -668,6 +668,12 @@ impl NestedField {
self.write_default = Some(value);
self
}
+
+ /// Set the id of the field.
+ pub(crate) fn with_id(mut self, id: i32) -> Self {
+ self.id = id;
+ self
+ }
}
impl fmt::Display for NestedField {
diff --git a/crates/iceberg/src/spec/schema.rs
b/crates/iceberg/src/spec/schema.rs
index 63a9e3cb..cf86874d 100644
--- a/crates/iceberg/src/spec/schema.rs
+++ b/crates/iceberg/src/spec/schema.rs
@@ -39,7 +39,7 @@ use crate::{ensure_data_valid, Error, ErrorKind};
pub type SchemaId = i32;
/// Reference to [`Schema`].
pub type SchemaRef = Arc<Schema>;
-const DEFAULT_SCHEMA_ID: SchemaId = 0;
+pub(crate) const DEFAULT_SCHEMA_ID: SchemaId = 0;
/// Defines schema in iceberg.
#[derive(Debug, Serialize, Deserialize, Clone)]
@@ -77,6 +77,7 @@ pub struct SchemaBuilder {
fields: Vec<NestedFieldRef>,
alias_to_id: BiHashMap<String, i32>,
identifier_field_ids: HashSet<i32>,
+ reassign_field_ids_from: Option<i32>,
}
impl SchemaBuilder {
@@ -86,6 +87,16 @@ impl SchemaBuilder {
self
}
+ /// Reassign all field-ids (including nested) on build.
+ /// Reassignment starts from the field-id specified in `start_from`
(inclusive).
+ ///
+ /// All specified aliases and identifier fields will be updated to the new
field-ids.
+ #[allow(dead_code)] // Will be needed in TableMetadataBuilder
+ pub(crate) fn with_reassigned_field_ids(mut self, start_from: u32) -> Self
{
+ self.reassign_field_ids_from =
Some(start_from.try_into().unwrap_or(i32::MAX));
+ self
+ }
+
/// Set schema id.
pub fn with_schema_id(mut self, schema_id: i32) -> Self {
self.schema_id = schema_id;
@@ -130,7 +141,7 @@ impl SchemaBuilder {
let highest_field_id = id_to_field.keys().max().cloned().unwrap_or(0);
- Ok(Schema {
+ let mut schema = Schema {
r#struct,
schema_id: self.schema_id,
highest_field_id,
@@ -143,7 +154,24 @@ impl SchemaBuilder {
id_to_name,
field_id_to_accessor,
- })
+ };
+
+ if let Some(start_from) = self.reassign_field_ids_from {
+ let mut id_reassigner = ReassignFieldIds::new(start_from);
+ let new_fields =
id_reassigner.reassign_field_ids(schema.r#struct.fields().to_vec())?;
+ let new_identifier_field_ids =
+
id_reassigner.apply_to_identifier_fields(schema.identifier_field_ids)?;
+ let new_alias_to_id =
id_reassigner.apply_to_aliases(schema.alias_to_id.clone())?;
+
+ schema = Schema::builder()
+ .with_schema_id(schema.schema_id)
+ .with_fields(new_fields)
+ .with_identifier_field_ids(new_identifier_field_ids)
+ .with_alias(new_alias_to_id)
+ .build()?;
+ }
+
+ Ok(schema)
}
fn build_accessors(&self) -> HashMap<i32, Arc<StructAccessor>> {
@@ -265,6 +293,7 @@ impl Schema {
fields: vec![],
identifier_field_ids: HashSet::default(),
alias_to_id: BiHashMap::default(),
+ reassign_field_ids_from: None,
}
}
@@ -275,6 +304,7 @@ impl Schema {
fields: self.r#struct.fields().to_vec(),
alias_to_id: self.alias_to_id,
identifier_field_ids: self.identifier_field_ids,
+ reassign_field_ids_from: None,
}
}
@@ -475,8 +505,7 @@ pub fn index_by_id(r#struct: &StructType) ->
Result<HashMap<i32, NestedFieldRef>
}
fn field(&mut self, field: &NestedFieldRef, _value: ()) -> Result<()> {
- self.0.insert(field.id, field.clone());
- Ok(())
+ try_insert_field(&mut self.0, field.id, field.clone())
}
fn r#struct(&mut self, _struct: &StructType, _results: Vec<Self::T>)
-> Result<Self::T> {
@@ -484,15 +513,16 @@ pub fn index_by_id(r#struct: &StructType) ->
Result<HashMap<i32, NestedFieldRef>
}
fn list(&mut self, list: &ListType, _value: Self::T) ->
Result<Self::T> {
- self.0
- .insert(list.element_field.id, list.element_field.clone());
- Ok(())
+ try_insert_field(
+ &mut self.0,
+ list.element_field.id,
+ list.element_field.clone(),
+ )
}
fn map(&mut self, map: &MapType, _key_value: Self::T, _value: Self::T)
-> Result<Self::T> {
- self.0.insert(map.key_field.id, map.key_field.clone());
- self.0.insert(map.value_field.id, map.value_field.clone());
- Ok(())
+ try_insert_field(&mut self.0, map.key_field.id,
map.key_field.clone())?;
+ try_insert_field(&mut self.0, map.value_field.id,
map.value_field.clone())
}
fn primitive(&mut self, _: &PrimitiveType) -> Result<Self::T> {
@@ -943,6 +973,148 @@ impl SchemaVisitor for PruneColumn {
}
}
+struct ReassignFieldIds {
+ next_field_id: i32,
+ old_to_new_id: HashMap<i32, i32>,
+}
+
+fn try_insert_field<V>(map: &mut HashMap<i32, V>, field_id: i32, value: V) ->
Result<()> {
+ map.insert(field_id, value).map_or_else(
+ || Ok(()),
+ |_| {
+ Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Found duplicate 'field.id' {}. Field ids must be unique.",
+ field_id
+ ),
+ ))
+ },
+ )
+}
+
+// We are not using the visitor here, as post order traversal is not desired.
+// Instead we want to re-assign all fields on one level first before diving
deeper.
+impl ReassignFieldIds {
+ fn new(start_from: i32) -> Self {
+ Self {
+ next_field_id: start_from,
+ old_to_new_id: HashMap::new(),
+ }
+ }
+
+ fn reassign_field_ids(&mut self, fields: Vec<NestedFieldRef>) ->
Result<Vec<NestedFieldRef>> {
+ // Visit fields on the same level first
+ let outer_fields = fields
+ .into_iter()
+ .map(|field| {
+ try_insert_field(&mut self.old_to_new_id, field.id,
self.next_field_id)?;
+ let new_field =
Arc::unwrap_or_clone(field).with_id(self.next_field_id);
+ self.increase_next_field_id()?;
+ Ok(Arc::new(new_field))
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ // Now visit nested fields
+ outer_fields
+ .into_iter()
+ .map(|field| {
+ if field.field_type.is_primitive() {
+ Ok(field)
+ } else {
+ let mut new_field = Arc::unwrap_or_clone(field);
+ *new_field.field_type =
self.reassign_ids_visit_type(*new_field.field_type)?;
+ Ok(Arc::new(new_field))
+ }
+ })
+ .collect()
+ }
+
+ fn reassign_ids_visit_type(&mut self, field_type: Type) -> Result<Type> {
+ match field_type {
+ Type::Primitive(s) => Ok(Type::Primitive(s)),
+ Type::Struct(s) => {
+ let new_fields = self.reassign_field_ids(s.fields().to_vec())?;
+ Ok(Type::Struct(StructType::new(new_fields)))
+ }
+ Type::List(l) => {
+ self.old_to_new_id
+ .insert(l.element_field.id, self.next_field_id);
+ let mut element_field = Arc::unwrap_or_clone(l.element_field);
+ element_field.id = self.next_field_id;
+ self.increase_next_field_id()?;
+ *element_field.field_type =
+ self.reassign_ids_visit_type(*element_field.field_type)?;
+ Ok(Type::List(ListType {
+ element_field: Arc::new(element_field),
+ }))
+ }
+ Type::Map(m) => {
+ self.old_to_new_id
+ .insert(m.key_field.id, self.next_field_id);
+ let mut key_field = Arc::unwrap_or_clone(m.key_field);
+ key_field.id = self.next_field_id;
+ self.increase_next_field_id()?;
+ *key_field.field_type =
self.reassign_ids_visit_type(*key_field.field_type)?;
+
+ self.old_to_new_id
+ .insert(m.value_field.id, self.next_field_id);
+ let mut value_field = Arc::unwrap_or_clone(m.value_field);
+ value_field.id = self.next_field_id;
+ self.increase_next_field_id()?;
+ *value_field.field_type =
self.reassign_ids_visit_type(*value_field.field_type)?;
+
+ Ok(Type::Map(MapType {
+ key_field: Arc::new(key_field),
+ value_field: Arc::new(value_field),
+ }))
+ }
+ }
+ }
+
+ fn increase_next_field_id(&mut self) -> Result<()> {
+ self.next_field_id = self.next_field_id.checked_add(1).ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ "Field ID overflowed, cannot add more fields",
+ )
+ })?;
+ Ok(())
+ }
+
+ fn apply_to_identifier_fields(&self, field_ids: HashSet<i32>) ->
Result<HashSet<i32>> {
+ field_ids
+ .into_iter()
+ .map(|id| {
+ self.old_to_new_id.get(&id).copied().ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!("Identifier Field ID {} not found", id),
+ )
+ })
+ })
+ .collect()
+ }
+
+ fn apply_to_aliases(&self, alias: BiHashMap<String, i32>) ->
Result<BiHashMap<String, i32>> {
+ alias
+ .into_iter()
+ .map(|(name, id)| {
+ self.old_to_new_id
+ .get(&id)
+ .copied()
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!("Field with id {} for alias {} not found",
id, name),
+ )
+ })
+ .map(|new_id| (name, new_id))
+ })
+ .collect()
+ }
+}
+
pub(super) mod _serde {
/// This is a helper module that defines types to help with
serialization/deserialization.
/// For deserialization the input first gets read into either the
[SchemaV1] or [SchemaV2] struct
@@ -1062,6 +1234,8 @@ pub(super) mod _serde {
mod tests {
use std::collections::{HashMap, HashSet};
+ use bimap::BiHashMap;
+
use super::DEFAULT_SCHEMA_ID;
use crate::spec::datatypes::Type::{List, Map, Primitive, Struct};
use crate::spec::datatypes::{
@@ -2237,4 +2411,203 @@ table {
let schema = table_schema_simple().0;
assert_eq!(3, schema.highest_field_id());
}
+
+ #[test]
+ fn test_highest_field_id_no_fields() {
+ let schema = Schema::builder().with_schema_id(1).build().unwrap();
+ assert_eq!(0, schema.highest_field_id());
+ }
+
+ #[test]
+ fn test_reassign_ids() {
+ let schema = Schema::builder()
+ .with_schema_id(1)
+ .with_identifier_field_ids(vec![3])
+ .with_alias(BiHashMap::from_iter(vec![("bar_alias".to_string(),
3)]))
+ .with_fields(vec![
+ NestedField::optional(5, "foo",
Type::Primitive(PrimitiveType::String)).into(),
+ NestedField::required(3, "bar",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::optional(4, "baz",
Type::Primitive(PrimitiveType::Boolean)).into(),
+ ])
+ .build()
+ .unwrap();
+
+ let reassigned_schema = schema
+ .into_builder()
+ .with_reassigned_field_ids(0)
+ .build()
+ .unwrap();
+
+ let expected = Schema::builder()
+ .with_schema_id(1)
+ .with_identifier_field_ids(vec![1])
+ .with_alias(BiHashMap::from_iter(vec![("bar_alias".to_string(),
1)]))
+ .with_fields(vec![
+ NestedField::optional(0, "foo",
Type::Primitive(PrimitiveType::String)).into(),
+ NestedField::required(1, "bar",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::optional(2, "baz",
Type::Primitive(PrimitiveType::Boolean)).into(),
+ ])
+ .build()
+ .unwrap();
+
+ pretty_assertions::assert_eq!(expected, reassigned_schema);
+ assert_eq!(reassigned_schema.highest_field_id(), 2);
+ }
+
+ #[test]
+ fn test_reassigned_ids_nested() {
+ let schema = table_schema_nested();
+ let reassigned_schema = schema
+ .into_builder()
+ .with_alias(BiHashMap::from_iter(vec![("bar_alias".to_string(),
2)]))
+ .with_reassigned_field_ids(0)
+ .build()
+ .unwrap();
+
+ let expected = Schema::builder()
+ .with_schema_id(1)
+ .with_identifier_field_ids(vec![1])
+ .with_alias(BiHashMap::from_iter(vec![("bar_alias".to_string(),
1)]))
+ .with_fields(vec![
+ NestedField::optional(0, "foo",
Type::Primitive(PrimitiveType::String)).into(),
+ NestedField::required(1, "bar",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::optional(2, "baz",
Type::Primitive(PrimitiveType::Boolean)).into(),
+ NestedField::required(
+ 3,
+ "qux",
+ Type::List(ListType {
+ element_field: NestedField::list_element(
+ 7,
+ Type::Primitive(PrimitiveType::String),
+ true,
+ )
+ .into(),
+ }),
+ )
+ .into(),
+ NestedField::required(
+ 4,
+ "quux",
+ Type::Map(MapType {
+ key_field: NestedField::map_key_element(
+ 8,
+ Type::Primitive(PrimitiveType::String),
+ )
+ .into(),
+ value_field: NestedField::map_value_element(
+ 9,
+ Type::Map(MapType {
+ key_field: NestedField::map_key_element(
+ 10,
+ Type::Primitive(PrimitiveType::String),
+ )
+ .into(),
+ value_field: NestedField::map_value_element(
+ 11,
+ Type::Primitive(PrimitiveType::Int),
+ true,
+ )
+ .into(),
+ }),
+ true,
+ )
+ .into(),
+ }),
+ )
+ .into(),
+ NestedField::required(
+ 5,
+ "location",
+ Type::List(ListType {
+ element_field: NestedField::list_element(
+ 12,
+ Type::Struct(StructType::new(vec![
+ NestedField::optional(
+ 13,
+ "latitude",
+ Type::Primitive(PrimitiveType::Float),
+ )
+ .into(),
+ NestedField::optional(
+ 14,
+ "longitude",
+ Type::Primitive(PrimitiveType::Float),
+ )
+ .into(),
+ ])),
+ true,
+ )
+ .into(),
+ }),
+ )
+ .into(),
+ NestedField::optional(
+ 6,
+ "person",
+ Type::Struct(StructType::new(vec![
+ NestedField::optional(15, "name",
Type::Primitive(PrimitiveType::String))
+ .into(),
+ NestedField::required(16, "age",
Type::Primitive(PrimitiveType::Int))
+ .into(),
+ ])),
+ )
+ .into(),
+ ])
+ .build()
+ .unwrap();
+
+ pretty_assertions::assert_eq!(expected, reassigned_schema);
+ assert_eq!(reassigned_schema.highest_field_id(), 16);
+ assert_eq!(reassigned_schema.field_by_id(6).unwrap().name, "person");
+ assert_eq!(reassigned_schema.field_by_id(16).unwrap().name, "age");
+ }
+
+ #[test]
+ fn test_reassign_ids_fails_with_duplicate_ids() {
+ let reassigned_schema = Schema::builder()
+ .with_schema_id(1)
+ .with_identifier_field_ids(vec![5])
+ .with_alias(BiHashMap::from_iter(vec![("bar_alias".to_string(),
3)]))
+ .with_fields(vec![
+ NestedField::required(5, "foo",
Type::Primitive(PrimitiveType::String)).into(),
+ NestedField::optional(3, "bar",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::optional(3, "baz",
Type::Primitive(PrimitiveType::Boolean)).into(),
+ ])
+ .with_reassigned_field_ids(0)
+ .build()
+ .unwrap_err();
+
+ assert!(reassigned_schema.message().contains("'field.id' 3"));
+ }
+
+ #[test]
+ fn test_field_ids_must_be_unique() {
+ let reassigned_schema = Schema::builder()
+ .with_schema_id(1)
+ .with_identifier_field_ids(vec![5])
+ .with_alias(BiHashMap::from_iter(vec![("bar_alias".to_string(),
3)]))
+ .with_fields(vec![
+ NestedField::required(5, "foo",
Type::Primitive(PrimitiveType::String)).into(),
+ NestedField::optional(3, "bar",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::optional(3, "baz",
Type::Primitive(PrimitiveType::Boolean)).into(),
+ ])
+ .build()
+ .unwrap_err();
+
+ assert!(reassigned_schema.message().contains("'field.id' 3"));
+ }
+
+ #[test]
+ fn test_reassign_ids_empty_schema() {
+ let schema = Schema::builder().with_schema_id(1).build().unwrap();
+ let reassigned_schema = schema
+ .clone()
+ .into_builder()
+ .with_reassigned_field_ids(0)
+ .build()
+ .unwrap();
+
+ assert_eq!(schema, reassigned_schema);
+ assert_eq!(schema.highest_field_id(), 0);
+ }
}
diff --git a/crates/iceberg/src/spec/values.rs
b/crates/iceberg/src/spec/values.rs
index 3568d3dc..3c6e2aa6 100644
--- a/crates/iceberg/src/spec/values.rs
+++ b/crates/iceberg/src/spec/values.rs
@@ -3192,10 +3192,10 @@ mod tests {
(Literal::Primitive(PrimitiveLiteral::Int(3)), None),
])),
&Type::Map(MapType {
- key_field: NestedField::map_key_element(0,
Type::Primitive(PrimitiveType::Int))
+ key_field: NestedField::map_key_element(2,
Type::Primitive(PrimitiveType::Int))
.into(),
value_field: NestedField::map_value_element(
- 1,
+ 3,
Type::Primitive(PrimitiveType::Long),
false,
)
@@ -3219,10 +3219,10 @@ mod tests {
),
])),
&Type::Map(MapType {
- key_field: NestedField::map_key_element(0,
Type::Primitive(PrimitiveType::Int))
+ key_field: NestedField::map_key_element(2,
Type::Primitive(PrimitiveType::Int))
.into(),
value_field: NestedField::map_value_element(
- 1,
+ 3,
Type::Primitive(PrimitiveType::Long),
true,
)
@@ -3249,10 +3249,10 @@ mod tests {
),
])),
&Type::Map(MapType {
- key_field: NestedField::map_key_element(0,
Type::Primitive(PrimitiveType::String))
+ key_field: NestedField::map_key_element(2,
Type::Primitive(PrimitiveType::String))
.into(),
value_field: NestedField::map_value_element(
- 1,
+ 3,
Type::Primitive(PrimitiveType::Int),
false,
)
@@ -3276,10 +3276,10 @@ mod tests {
),
])),
&Type::Map(MapType {
- key_field: NestedField::map_key_element(0,
Type::Primitive(PrimitiveType::String))
+ key_field: NestedField::map_key_element(2,
Type::Primitive(PrimitiveType::String))
.into(),
value_field: NestedField::map_value_element(
- 1,
+ 3,
Type::Primitive(PrimitiveType::Int),
true,
)
@@ -3299,9 +3299,9 @@ mod tests {
None,
])),
&Type::Struct(StructType::new(vec![
- NestedField::required(1, "id",
Type::Primitive(PrimitiveType::Int)).into(),
- NestedField::optional(2, "name",
Type::Primitive(PrimitiveType::String)).into(),
- NestedField::optional(3, "address",
Type::Primitive(PrimitiveType::String)).into(),
+ NestedField::required(2, "id",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::optional(3, "name",
Type::Primitive(PrimitiveType::String)).into(),
+ NestedField::optional(4, "address",
Type::Primitive(PrimitiveType::String)).into(),
])),
);
}