This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 58123990 feat: partition compatibility (#612)
58123990 is described below
commit 58123990c304b72139651a16f45f6505bc868cc3
Author: Christian <[email protected]>
AuthorDate: Sun Sep 8 18:18:41 2024 +0200
feat: partition compatibility (#612)
* Partition compatability
* Partition compatability
* Rename compatible_with -> is_compatible_with
---
crates/iceberg/src/spec/partition.rs | 414 ++++++++++++++++++++++++++++++++++-
1 file changed, 413 insertions(+), 1 deletion(-)
diff --git a/crates/iceberg/src/spec/partition.rs
b/crates/iceberg/src/spec/partition.rs
index f262a8c7..36763df7 100644
--- a/crates/iceberg/src/spec/partition.rs
+++ b/crates/iceberg/src/spec/partition.rs
@@ -118,9 +118,63 @@ impl PartitionSpec {
/// Turn this partition spec into an unbound partition spec.
///
/// The `field_id` is retained as `partition_id` in the unbound partition
spec.
- pub fn to_unbound(self) -> UnboundPartitionSpec {
+ pub fn into_unbound(self) -> UnboundPartitionSpec {
self.into()
}
+
+ /// Check if this partition spec is compatible with another partition spec.
+ ///
+ /// Returns true if the partition spec is equal to the other spec with
partition field ids ignored and
+ /// spec_id ignored. The following must be identical:
+ /// * The number of fields
+ /// * Field order
+ /// * Field names
+ /// * Source column ids
+ /// * Transforms
+ pub fn is_compatible_with(&self, other: &UnboundPartitionSpec) -> bool {
+ if self.fields.len() != other.fields.len() {
+ return false;
+ }
+
+ for (this_field, other_field) in self.fields.iter().zip(&other.fields)
{
+ if this_field.source_id != other_field.source_id
+ || this_field.transform != other_field.transform
+ || this_field.name != other_field.name
+ {
+ return false;
+ }
+ }
+
+ true
+ }
+
+ /// Check if this partition spec has sequential partition ids.
+ /// Sequential ids start from 1000 and increment by 1 for each field.
+ /// This is required for spec version 1
+ pub fn has_sequential_ids(&self) -> bool {
+ for (index, field) in self.fields.iter().enumerate() {
+ let expected_id = (UNPARTITIONED_LAST_ASSIGNED_ID as i64)
+ .checked_add(1)
+ .and_then(|id| id.checked_add(index as i64))
+ .unwrap_or(i64::MAX);
+
+ if field.field_id as i64 != expected_id {
+ return false;
+ }
+ }
+
+ true
+ }
+
+ /// Get the highest field id in the partition spec.
+ /// If the partition spec is unpartitioned, it returns the last
unpartitioned last assigned id (999).
+ pub fn highest_field_id(&self) -> i32 {
+ self.fields
+ .iter()
+ .map(|f| f.field_id)
+ .max()
+ .unwrap_or(UNPARTITIONED_LAST_ASSIGNED_ID)
+ }
}
/// Reference to [`UnboundPartitionSpec`].
@@ -171,6 +225,14 @@ impl UnboundPartitionSpec {
pub fn fields(&self) -> &[UnboundPartitionField] {
&self.fields
}
+
+ /// Change the spec id of the partition spec
+ pub fn with_spec_id(self, spec_id: i32) -> Self {
+ Self {
+ spec_id: Some(spec_id),
+ ..self
+ }
+ }
}
impl From<PartitionField> for UnboundPartitionField {
@@ -1263,4 +1325,354 @@ mod tests {
}]
});
}
+
+ #[test]
+ fn test_is_compatible_with() {
+ let schema = Schema::builder()
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(crate::spec::PrimitiveType::Int))
+ .into(),
+ NestedField::required(
+ 2,
+ "name",
+ Type::Primitive(crate::spec::PrimitiveType::String),
+ )
+ .into(),
+ ])
+ .build()
+ .unwrap();
+
+ let partition_spec_1 = PartitionSpec::builder(&schema)
+ .with_spec_id(1)
+ .add_unbound_field(UnboundPartitionField {
+ source_id: 1,
+ field_id: None,
+ name: "id_bucket".to_string(),
+ transform: Transform::Bucket(16),
+ })
+ .unwrap()
+ .build()
+ .unwrap();
+
+ let partition_spec_2 = PartitionSpec::builder(&schema)
+ .with_spec_id(1)
+ .add_unbound_field(UnboundPartitionField {
+ source_id: 1,
+ field_id: None,
+ name: "id_bucket".to_string(),
+ transform: Transform::Bucket(16),
+ })
+ .unwrap()
+ .build()
+ .unwrap();
+
+
assert!(partition_spec_1.is_compatible_with(&partition_spec_2.into_unbound()));
+ }
+
+ #[test]
+ fn test_not_compatible_with_transform_different() {
+ let schema = Schema::builder()
+ .with_fields(vec![NestedField::required(
+ 1,
+ "id",
+ Type::Primitive(crate::spec::PrimitiveType::Int),
+ )
+ .into()])
+ .build()
+ .unwrap();
+
+ let partition_spec_1 = PartitionSpec::builder(&schema)
+ .with_spec_id(1)
+ .add_unbound_field(UnboundPartitionField {
+ source_id: 1,
+ field_id: None,
+ name: "id_bucket".to_string(),
+ transform: Transform::Bucket(16),
+ })
+ .unwrap()
+ .build()
+ .unwrap();
+
+ let partition_spec_2 = PartitionSpec::builder(&schema)
+ .with_spec_id(1)
+ .add_unbound_field(UnboundPartitionField {
+ source_id: 1,
+ field_id: None,
+ name: "id_bucket".to_string(),
+ transform: Transform::Bucket(32),
+ })
+ .unwrap()
+ .build()
+ .unwrap();
+
+
assert!(!partition_spec_1.is_compatible_with(&partition_spec_2.into_unbound()));
+ }
+
+ #[test]
+ fn test_not_compatible_with_source_id_different() {
+ let schema = Schema::builder()
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(crate::spec::PrimitiveType::Int))
+ .into(),
+ NestedField::required(
+ 2,
+ "name",
+ Type::Primitive(crate::spec::PrimitiveType::String),
+ )
+ .into(),
+ ])
+ .build()
+ .unwrap();
+
+ let partition_spec_1 = PartitionSpec::builder(&schema)
+ .with_spec_id(1)
+ .add_unbound_field(UnboundPartitionField {
+ source_id: 1,
+ field_id: None,
+ name: "id_bucket".to_string(),
+ transform: Transform::Bucket(16),
+ })
+ .unwrap()
+ .build()
+ .unwrap();
+
+ let partition_spec_2 = PartitionSpec::builder(&schema)
+ .with_spec_id(1)
+ .add_unbound_field(UnboundPartitionField {
+ source_id: 2,
+ field_id: None,
+ name: "id_bucket".to_string(),
+ transform: Transform::Bucket(16),
+ })
+ .unwrap()
+ .build()
+ .unwrap();
+
+
assert!(!partition_spec_1.is_compatible_with(&partition_spec_2.into_unbound()));
+ }
+
+ #[test]
+ fn test_not_compatible_with_order_different() {
+ let schema = Schema::builder()
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(crate::spec::PrimitiveType::Int))
+ .into(),
+ NestedField::required(
+ 2,
+ "name",
+ Type::Primitive(crate::spec::PrimitiveType::String),
+ )
+ .into(),
+ ])
+ .build()
+ .unwrap();
+
+ let partition_spec_1 = PartitionSpec::builder(&schema)
+ .with_spec_id(1)
+ .add_unbound_field(UnboundPartitionField {
+ source_id: 1,
+ field_id: None,
+ name: "id_bucket".to_string(),
+ transform: Transform::Bucket(16),
+ })
+ .unwrap()
+ .add_unbound_field(UnboundPartitionField {
+ source_id: 2,
+ field_id: None,
+ name: "name".to_string(),
+ transform: Transform::Identity,
+ })
+ .unwrap()
+ .build()
+ .unwrap();
+
+ let partition_spec_2 = PartitionSpec::builder(&schema)
+ .with_spec_id(1)
+ .add_unbound_field(UnboundPartitionField {
+ source_id: 2,
+ field_id: None,
+ name: "name".to_string(),
+ transform: Transform::Identity,
+ })
+ .unwrap()
+ .add_unbound_field(UnboundPartitionField {
+ source_id: 1,
+ field_id: None,
+ name: "id_bucket".to_string(),
+ transform: Transform::Bucket(16),
+ })
+ .unwrap()
+ .build()
+ .unwrap();
+
+
assert!(!partition_spec_1.is_compatible_with(&partition_spec_2.into_unbound()));
+ }
+
+ #[test]
+ fn test_highest_field_id_unpartitioned() {
+ let spec =
PartitionSpec::builder(&Schema::builder().with_fields(vec![]).build().unwrap())
+ .with_spec_id(1)
+ .build()
+ .unwrap();
+
+ assert_eq!(UNPARTITIONED_LAST_ASSIGNED_ID, spec.highest_field_id());
+ }
+
+ #[test]
+ fn test_highest_field_id() {
+ let schema = Schema::builder()
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(crate::spec::PrimitiveType::Int))
+ .into(),
+ NestedField::required(
+ 2,
+ "name",
+ Type::Primitive(crate::spec::PrimitiveType::String),
+ )
+ .into(),
+ ])
+ .build()
+ .unwrap();
+
+ let spec = PartitionSpec::builder(&schema)
+ .with_spec_id(1)
+ .add_unbound_field(UnboundPartitionField {
+ source_id: 1,
+ field_id: Some(1001),
+ name: "id".to_string(),
+ transform: Transform::Identity,
+ })
+ .unwrap()
+ .add_unbound_field(UnboundPartitionField {
+ source_id: 2,
+ field_id: Some(1000),
+ name: "name".to_string(),
+ transform: Transform::Identity,
+ })
+ .unwrap()
+ .build()
+ .unwrap();
+
+ assert_eq!(1001, spec.highest_field_id());
+ }
+
+ #[test]
+ fn test_has_sequential_ids() {
+ let schema = Schema::builder()
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(crate::spec::PrimitiveType::Int))
+ .into(),
+ NestedField::required(
+ 2,
+ "name",
+ Type::Primitive(crate::spec::PrimitiveType::String),
+ )
+ .into(),
+ ])
+ .build()
+ .unwrap();
+
+ let spec = PartitionSpec::builder(&schema)
+ .with_spec_id(1)
+ .add_unbound_field(UnboundPartitionField {
+ source_id: 1,
+ field_id: Some(1000),
+ name: "id".to_string(),
+ transform: Transform::Identity,
+ })
+ .unwrap()
+ .add_unbound_field(UnboundPartitionField {
+ source_id: 2,
+ field_id: Some(1001),
+ name: "name".to_string(),
+ transform: Transform::Identity,
+ })
+ .unwrap()
+ .build()
+ .unwrap();
+
+ assert_eq!(1000, spec.fields[0].field_id);
+ assert_eq!(1001, spec.fields[1].field_id);
+ assert!(spec.has_sequential_ids());
+ }
+
+ #[test]
+ fn test_sequential_ids_must_start_at_1000() {
+ let schema = Schema::builder()
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(crate::spec::PrimitiveType::Int))
+ .into(),
+ NestedField::required(
+ 2,
+ "name",
+ Type::Primitive(crate::spec::PrimitiveType::String),
+ )
+ .into(),
+ ])
+ .build()
+ .unwrap();
+
+ let spec = PartitionSpec::builder(&schema)
+ .with_spec_id(1)
+ .add_unbound_field(UnboundPartitionField {
+ source_id: 1,
+ field_id: Some(999),
+ name: "id".to_string(),
+ transform: Transform::Identity,
+ })
+ .unwrap()
+ .add_unbound_field(UnboundPartitionField {
+ source_id: 2,
+ field_id: Some(1000),
+ name: "name".to_string(),
+ transform: Transform::Identity,
+ })
+ .unwrap()
+ .build()
+ .unwrap();
+
+ assert_eq!(999, spec.fields[0].field_id);
+ assert_eq!(1000, spec.fields[1].field_id);
+ assert!(!spec.has_sequential_ids());
+ }
+
+ #[test]
+ fn test_sequential_ids_must_have_no_gaps() {
+ let schema = Schema::builder()
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(crate::spec::PrimitiveType::Int))
+ .into(),
+ NestedField::required(
+ 2,
+ "name",
+ Type::Primitive(crate::spec::PrimitiveType::String),
+ )
+ .into(),
+ ])
+ .build()
+ .unwrap();
+
+ let spec = PartitionSpec::builder(&schema)
+ .with_spec_id(1)
+ .add_unbound_field(UnboundPartitionField {
+ source_id: 1,
+ field_id: Some(1000),
+ name: "id".to_string(),
+ transform: Transform::Identity,
+ })
+ .unwrap()
+ .add_unbound_field(UnboundPartitionField {
+ source_id: 2,
+ field_id: Some(1002),
+ name: "name".to_string(),
+ transform: Transform::Identity,
+ })
+ .unwrap()
+ .build()
+ .unwrap();
+
+ assert_eq!(1000, spec.fields[0].field_id);
+ assert_eq!(1002, spec.fields[1].field_id);
+ assert!(!spec.has_sequential_ids());
+ }
}