This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 58123990 feat: partition compatibility (#612)
58123990 is described below

commit 58123990c304b72139651a16f45f6505bc868cc3
Author: Christian <[email protected]>
AuthorDate: Sun Sep 8 18:18:41 2024 +0200

    feat: partition compatibility (#612)
    
    * Partition compatability
    
    * Partition compatability
    
    * Rename compatible_with -> is_compatible_with
---
 crates/iceberg/src/spec/partition.rs | 414 ++++++++++++++++++++++++++++++++++-
 1 file changed, 413 insertions(+), 1 deletion(-)

diff --git a/crates/iceberg/src/spec/partition.rs 
b/crates/iceberg/src/spec/partition.rs
index f262a8c7..36763df7 100644
--- a/crates/iceberg/src/spec/partition.rs
+++ b/crates/iceberg/src/spec/partition.rs
@@ -118,9 +118,63 @@ impl PartitionSpec {
     /// Turn this partition spec into an unbound partition spec.
     ///
     /// The `field_id` is retained as `partition_id` in the unbound partition 
spec.
-    pub fn to_unbound(self) -> UnboundPartitionSpec {
+    pub fn into_unbound(self) -> UnboundPartitionSpec {
         self.into()
     }
+
+    /// Check if this partition spec is compatible with another partition spec.
+    ///
+    /// Returns true if the partition spec is equal to the other spec with 
partition field ids ignored and
+    /// spec_id ignored. The following must be identical:
+    /// * The number of fields
+    /// * Field order
+    /// * Field names
+    /// * Source column ids
+    /// * Transforms
+    pub fn is_compatible_with(&self, other: &UnboundPartitionSpec) -> bool {
+        if self.fields.len() != other.fields.len() {
+            return false;
+        }
+
+        for (this_field, other_field) in self.fields.iter().zip(&other.fields) 
{
+            if this_field.source_id != other_field.source_id
+                || this_field.transform != other_field.transform
+                || this_field.name != other_field.name
+            {
+                return false;
+            }
+        }
+
+        true
+    }
+
+    /// Check if this partition spec has sequential partition ids.
+    /// Sequential ids start from 1000 and increment by 1 for each field.
+    /// This is required for spec version 1
+    pub fn has_sequential_ids(&self) -> bool {
+        for (index, field) in self.fields.iter().enumerate() {
+            let expected_id = (UNPARTITIONED_LAST_ASSIGNED_ID as i64)
+                .checked_add(1)
+                .and_then(|id| id.checked_add(index as i64))
+                .unwrap_or(i64::MAX);
+
+            if field.field_id as i64 != expected_id {
+                return false;
+            }
+        }
+
+        true
+    }
+
+    /// Get the highest field id in the partition spec.
+    /// If the partition spec is unpartitioned, it returns the last 
unpartitioned last assigned id (999).
+    pub fn highest_field_id(&self) -> i32 {
+        self.fields
+            .iter()
+            .map(|f| f.field_id)
+            .max()
+            .unwrap_or(UNPARTITIONED_LAST_ASSIGNED_ID)
+    }
 }
 
 /// Reference to [`UnboundPartitionSpec`].
@@ -171,6 +225,14 @@ impl UnboundPartitionSpec {
     pub fn fields(&self) -> &[UnboundPartitionField] {
         &self.fields
     }
+
+    /// Change the spec id of the partition spec
+    pub fn with_spec_id(self, spec_id: i32) -> Self {
+        Self {
+            spec_id: Some(spec_id),
+            ..self
+        }
+    }
 }
 
 impl From<PartitionField> for UnboundPartitionField {
@@ -1263,4 +1325,354 @@ mod tests {
             }]
         });
     }
+
+    #[test]
+    fn test_is_compatible_with() {
+        let schema = Schema::builder()
+            .with_fields(vec![
+                NestedField::required(1, "id", 
Type::Primitive(crate::spec::PrimitiveType::Int))
+                    .into(),
+                NestedField::required(
+                    2,
+                    "name",
+                    Type::Primitive(crate::spec::PrimitiveType::String),
+                )
+                .into(),
+            ])
+            .build()
+            .unwrap();
+
+        let partition_spec_1 = PartitionSpec::builder(&schema)
+            .with_spec_id(1)
+            .add_unbound_field(UnboundPartitionField {
+                source_id: 1,
+                field_id: None,
+                name: "id_bucket".to_string(),
+                transform: Transform::Bucket(16),
+            })
+            .unwrap()
+            .build()
+            .unwrap();
+
+        let partition_spec_2 = PartitionSpec::builder(&schema)
+            .with_spec_id(1)
+            .add_unbound_field(UnboundPartitionField {
+                source_id: 1,
+                field_id: None,
+                name: "id_bucket".to_string(),
+                transform: Transform::Bucket(16),
+            })
+            .unwrap()
+            .build()
+            .unwrap();
+
+        
assert!(partition_spec_1.is_compatible_with(&partition_spec_2.into_unbound()));
+    }
+
+    #[test]
+    fn test_not_compatible_with_transform_different() {
+        let schema = Schema::builder()
+            .with_fields(vec![NestedField::required(
+                1,
+                "id",
+                Type::Primitive(crate::spec::PrimitiveType::Int),
+            )
+            .into()])
+            .build()
+            .unwrap();
+
+        let partition_spec_1 = PartitionSpec::builder(&schema)
+            .with_spec_id(1)
+            .add_unbound_field(UnboundPartitionField {
+                source_id: 1,
+                field_id: None,
+                name: "id_bucket".to_string(),
+                transform: Transform::Bucket(16),
+            })
+            .unwrap()
+            .build()
+            .unwrap();
+
+        let partition_spec_2 = PartitionSpec::builder(&schema)
+            .with_spec_id(1)
+            .add_unbound_field(UnboundPartitionField {
+                source_id: 1,
+                field_id: None,
+                name: "id_bucket".to_string(),
+                transform: Transform::Bucket(32),
+            })
+            .unwrap()
+            .build()
+            .unwrap();
+
+        
assert!(!partition_spec_1.is_compatible_with(&partition_spec_2.into_unbound()));
+    }
+
+    #[test]
+    fn test_not_compatible_with_source_id_different() {
+        let schema = Schema::builder()
+            .with_fields(vec![
+                NestedField::required(1, "id", 
Type::Primitive(crate::spec::PrimitiveType::Int))
+                    .into(),
+                NestedField::required(
+                    2,
+                    "name",
+                    Type::Primitive(crate::spec::PrimitiveType::String),
+                )
+                .into(),
+            ])
+            .build()
+            .unwrap();
+
+        let partition_spec_1 = PartitionSpec::builder(&schema)
+            .with_spec_id(1)
+            .add_unbound_field(UnboundPartitionField {
+                source_id: 1,
+                field_id: None,
+                name: "id_bucket".to_string(),
+                transform: Transform::Bucket(16),
+            })
+            .unwrap()
+            .build()
+            .unwrap();
+
+        let partition_spec_2 = PartitionSpec::builder(&schema)
+            .with_spec_id(1)
+            .add_unbound_field(UnboundPartitionField {
+                source_id: 2,
+                field_id: None,
+                name: "id_bucket".to_string(),
+                transform: Transform::Bucket(16),
+            })
+            .unwrap()
+            .build()
+            .unwrap();
+
+        
assert!(!partition_spec_1.is_compatible_with(&partition_spec_2.into_unbound()));
+    }
+
+    #[test]
+    fn test_not_compatible_with_order_different() {
+        let schema = Schema::builder()
+            .with_fields(vec![
+                NestedField::required(1, "id", 
Type::Primitive(crate::spec::PrimitiveType::Int))
+                    .into(),
+                NestedField::required(
+                    2,
+                    "name",
+                    Type::Primitive(crate::spec::PrimitiveType::String),
+                )
+                .into(),
+            ])
+            .build()
+            .unwrap();
+
+        let partition_spec_1 = PartitionSpec::builder(&schema)
+            .with_spec_id(1)
+            .add_unbound_field(UnboundPartitionField {
+                source_id: 1,
+                field_id: None,
+                name: "id_bucket".to_string(),
+                transform: Transform::Bucket(16),
+            })
+            .unwrap()
+            .add_unbound_field(UnboundPartitionField {
+                source_id: 2,
+                field_id: None,
+                name: "name".to_string(),
+                transform: Transform::Identity,
+            })
+            .unwrap()
+            .build()
+            .unwrap();
+
+        let partition_spec_2 = PartitionSpec::builder(&schema)
+            .with_spec_id(1)
+            .add_unbound_field(UnboundPartitionField {
+                source_id: 2,
+                field_id: None,
+                name: "name".to_string(),
+                transform: Transform::Identity,
+            })
+            .unwrap()
+            .add_unbound_field(UnboundPartitionField {
+                source_id: 1,
+                field_id: None,
+                name: "id_bucket".to_string(),
+                transform: Transform::Bucket(16),
+            })
+            .unwrap()
+            .build()
+            .unwrap();
+
+        
assert!(!partition_spec_1.is_compatible_with(&partition_spec_2.into_unbound()));
+    }
+
+    #[test]
+    fn test_highest_field_id_unpartitioned() {
+        let spec = 
PartitionSpec::builder(&Schema::builder().with_fields(vec![]).build().unwrap())
+            .with_spec_id(1)
+            .build()
+            .unwrap();
+
+        assert_eq!(UNPARTITIONED_LAST_ASSIGNED_ID, spec.highest_field_id());
+    }
+
+    #[test]
+    fn test_highest_field_id() {
+        let schema = Schema::builder()
+            .with_fields(vec![
+                NestedField::required(1, "id", 
Type::Primitive(crate::spec::PrimitiveType::Int))
+                    .into(),
+                NestedField::required(
+                    2,
+                    "name",
+                    Type::Primitive(crate::spec::PrimitiveType::String),
+                )
+                .into(),
+            ])
+            .build()
+            .unwrap();
+
+        let spec = PartitionSpec::builder(&schema)
+            .with_spec_id(1)
+            .add_unbound_field(UnboundPartitionField {
+                source_id: 1,
+                field_id: Some(1001),
+                name: "id".to_string(),
+                transform: Transform::Identity,
+            })
+            .unwrap()
+            .add_unbound_field(UnboundPartitionField {
+                source_id: 2,
+                field_id: Some(1000),
+                name: "name".to_string(),
+                transform: Transform::Identity,
+            })
+            .unwrap()
+            .build()
+            .unwrap();
+
+        assert_eq!(1001, spec.highest_field_id());
+    }
+
+    #[test]
+    fn test_has_sequential_ids() {
+        let schema = Schema::builder()
+            .with_fields(vec![
+                NestedField::required(1, "id", 
Type::Primitive(crate::spec::PrimitiveType::Int))
+                    .into(),
+                NestedField::required(
+                    2,
+                    "name",
+                    Type::Primitive(crate::spec::PrimitiveType::String),
+                )
+                .into(),
+            ])
+            .build()
+            .unwrap();
+
+        let spec = PartitionSpec::builder(&schema)
+            .with_spec_id(1)
+            .add_unbound_field(UnboundPartitionField {
+                source_id: 1,
+                field_id: Some(1000),
+                name: "id".to_string(),
+                transform: Transform::Identity,
+            })
+            .unwrap()
+            .add_unbound_field(UnboundPartitionField {
+                source_id: 2,
+                field_id: Some(1001),
+                name: "name".to_string(),
+                transform: Transform::Identity,
+            })
+            .unwrap()
+            .build()
+            .unwrap();
+
+        assert_eq!(1000, spec.fields[0].field_id);
+        assert_eq!(1001, spec.fields[1].field_id);
+        assert!(spec.has_sequential_ids());
+    }
+
+    #[test]
+    fn test_sequential_ids_must_start_at_1000() {
+        let schema = Schema::builder()
+            .with_fields(vec![
+                NestedField::required(1, "id", 
Type::Primitive(crate::spec::PrimitiveType::Int))
+                    .into(),
+                NestedField::required(
+                    2,
+                    "name",
+                    Type::Primitive(crate::spec::PrimitiveType::String),
+                )
+                .into(),
+            ])
+            .build()
+            .unwrap();
+
+        let spec = PartitionSpec::builder(&schema)
+            .with_spec_id(1)
+            .add_unbound_field(UnboundPartitionField {
+                source_id: 1,
+                field_id: Some(999),
+                name: "id".to_string(),
+                transform: Transform::Identity,
+            })
+            .unwrap()
+            .add_unbound_field(UnboundPartitionField {
+                source_id: 2,
+                field_id: Some(1000),
+                name: "name".to_string(),
+                transform: Transform::Identity,
+            })
+            .unwrap()
+            .build()
+            .unwrap();
+
+        assert_eq!(999, spec.fields[0].field_id);
+        assert_eq!(1000, spec.fields[1].field_id);
+        assert!(!spec.has_sequential_ids());
+    }
+
+    #[test]
+    fn test_sequential_ids_must_have_no_gaps() {
+        let schema = Schema::builder()
+            .with_fields(vec![
+                NestedField::required(1, "id", 
Type::Primitive(crate::spec::PrimitiveType::Int))
+                    .into(),
+                NestedField::required(
+                    2,
+                    "name",
+                    Type::Primitive(crate::spec::PrimitiveType::String),
+                )
+                .into(),
+            ])
+            .build()
+            .unwrap();
+
+        let spec = PartitionSpec::builder(&schema)
+            .with_spec_id(1)
+            .add_unbound_field(UnboundPartitionField {
+                source_id: 1,
+                field_id: Some(1000),
+                name: "id".to_string(),
+                transform: Transform::Identity,
+            })
+            .unwrap()
+            .add_unbound_field(UnboundPartitionField {
+                source_id: 2,
+                field_id: Some(1002),
+                name: "name".to_string(),
+                transform: Transform::Identity,
+            })
+            .unwrap()
+            .build()
+            .unwrap();
+
+        assert_eq!(1000, spec.fields[0].field_id);
+        assert_eq!(1002, spec.fields[1].field_id);
+        assert!(!spec.has_sequential_ids());
+    }
 }

Reply via email to