This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 9862026  feat: Partition Binding and safe PartitionSpecBuilder (#491)
9862026 is described below

commit 9862026b9f3c885a82e7b8b8da414c0c97436537
Author: Christian <[email protected]>
AuthorDate: Wed Aug 14 16:30:26 2024 +0200

    feat: Partition Binding and safe PartitionSpecBuilder (#491)
    
    * Initial commit
    
    * Fixes
    
    * Replace UnboundPartitionSpec Builder
    
    * Fix tests, allow year, month day partition
    
    * Comments
    
    * typos
    
    * Fix UnboundBuild setting partition_id
    
    * Add test for unbound spec without partition ids
    
    * Fix into_unbound fn name
    
    * Split bound & unbound Partition builder, change add_partition_fields
    
    * Improve comment
    
    * Fix fmt
    
    * Review fixes
    
    * Remove partition_names() HashSet creation
---
 crates/catalog/memory/src/catalog.rs               |   3 +-
 crates/catalog/rest/src/catalog.rs                 |   6 +-
 crates/iceberg/src/catalog/mod.rs                  |  34 +-
 .../src/expr/visitors/expression_evaluator.rs      |  14 +-
 .../expr/visitors/inclusive_metrics_evaluator.rs   |  11 +-
 .../src/expr/visitors/inclusive_projection.rs      | 110 +--
 crates/iceberg/src/spec/manifest.rs                |   8 +-
 crates/iceberg/src/spec/partition.rs               | 854 ++++++++++++++++++++-
 crates/iceberg/src/spec/table_metadata.rs          |  78 +-
 9 files changed, 945 insertions(+), 173 deletions(-)

diff --git a/crates/catalog/memory/src/catalog.rs 
b/crates/catalog/memory/src/catalog.rs
index d86bbfe..44086f8 100644
--- a/crates/catalog/memory/src/catalog.rs
+++ b/crates/catalog/memory/src/catalog.rs
@@ -358,9 +358,8 @@ mod tests {
 
         assert_eq!(metadata.current_schema().as_ref(), expected_schema);
 
-        let expected_partition_spec = PartitionSpec::builder()
+        let expected_partition_spec = PartitionSpec::builder(expected_schema)
             .with_spec_id(0)
-            .with_fields(vec![])
             .build()
             .unwrap();
 
diff --git a/crates/catalog/rest/src/catalog.rs 
b/crates/catalog/rest/src/catalog.rs
index 30f2e29..d74c8de 100644
--- a/crates/catalog/rest/src/catalog.rs
+++ b/crates/catalog/rest/src/catalog.rs
@@ -1467,13 +1467,13 @@ mod tests {
             .properties(HashMap::from([("owner".to_string(), 
"testx".to_string())]))
             .partition_spec(
                 UnboundPartitionSpec::builder()
-                    .with_fields(vec![UnboundPartitionField::builder()
+                    .add_partition_fields(vec![UnboundPartitionField::builder()
                         .source_id(1)
                         .transform(Transform::Truncate(3))
                         .name("id".to_string())
                         .build()])
-                    .build()
-                    .unwrap(),
+                    .unwrap()
+                    .build(),
             )
             .sort_order(
                 SortOrder::builder()
diff --git a/crates/iceberg/src/catalog/mod.rs 
b/crates/iceberg/src/catalog/mod.rs
index bc98772..aa2311b 100644
--- a/crates/iceberg/src/catalog/mod.rs
+++ b/crates/iceberg/src/catalog/mod.rs
@@ -229,7 +229,7 @@ pub struct TableCreation {
     /// The schema of the table.
     pub schema: Schema,
     /// The partition spec of the table, could be None.
-    #[builder(default, setter(strip_option))]
+    #[builder(default, setter(strip_option, into))]
     pub partition_spec: Option<UnboundPartitionSpec>,
     /// The sort order of the table.
     #[builder(default, setter(strip_option))]
@@ -476,7 +476,7 @@ mod tests {
     use crate::spec::{
         FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, 
Schema, Snapshot,
         SnapshotReference, SnapshotRetention, SortDirection, SortField, 
SortOrder, Summary,
-        TableMetadataBuilder, Transform, Type, UnboundPartitionField, 
UnboundPartitionSpec,
+        TableMetadataBuilder, Transform, Type, UnboundPartitionSpec,
     };
     use crate::{NamespaceIdent, TableCreation, TableIdent, TableRequirement, 
TableUpdate};
 
@@ -820,29 +820,13 @@ mod tests {
         "#,
             TableUpdate::AddSpec {
                 spec: UnboundPartitionSpec::builder()
-                    .with_unbound_partition_field(
-                        UnboundPartitionField::builder()
-                            .source_id(4)
-                            .name("ts_day".to_string())
-                            .transform(Transform::Day)
-                            .build(),
-                    )
-                    .with_unbound_partition_field(
-                        UnboundPartitionField::builder()
-                            .source_id(1)
-                            .name("id_bucket".to_string())
-                            .transform(Transform::Bucket(16))
-                            .build(),
-                    )
-                    .with_unbound_partition_field(
-                        UnboundPartitionField::builder()
-                            .source_id(2)
-                            .name("id_truncate".to_string())
-                            .transform(Transform::Truncate(4))
-                            .build(),
-                    )
-                    .build()
-                    .unwrap(),
+                    .add_partition_field(4, "ts_day".to_string(), 
Transform::Day)
+                    .unwrap()
+                    .add_partition_field(1, "id_bucket".to_string(), 
Transform::Bucket(16))
+                    .unwrap()
+                    .add_partition_field(2, "id_truncate".to_string(), 
Transform::Truncate(4))
+                    .unwrap()
+                    .build(),
             },
         );
     }
diff --git a/crates/iceberg/src/expr/visitors/expression_evaluator.rs 
b/crates/iceberg/src/expr/visitors/expression_evaluator.rs
index 3700a9b..d8a47ec 100644
--- a/crates/iceberg/src/expr/visitors/expression_evaluator.rs
+++ b/crates/iceberg/src/expr/visitors/expression_evaluator.rs
@@ -258,8 +258,9 @@ mod tests {
         UnaryExpression,
     };
     use crate::spec::{
-        DataContentType, DataFile, DataFileFormat, Datum, Literal, 
NestedField, PartitionField,
-        PartitionSpec, PartitionSpecRef, PrimitiveType, Schema, SchemaRef, 
Struct, Transform, Type,
+        DataContentType, DataFile, DataFileFormat, Datum, Literal, 
NestedField, PartitionSpec,
+        PartitionSpecRef, PrimitiveType, Schema, SchemaRef, Struct, Transform, 
Type,
+        UnboundPartitionField,
     };
     use crate::Result;
 
@@ -274,14 +275,15 @@ mod tests {
             ))])
             .build()?;
 
-        let spec = PartitionSpec::builder()
+        let spec = PartitionSpec::builder(&schema)
             .with_spec_id(1)
-            .with_fields(vec![PartitionField::builder()
+            .add_unbound_fields(vec![UnboundPartitionField::builder()
                 .source_id(1)
                 .name("a".to_string())
-                .field_id(1)
+                .partition_id(1)
                 .transform(Transform::Identity)
                 .build()])
+            .unwrap()
             .build()
             .unwrap();
 
@@ -298,7 +300,7 @@ mod tests {
         let partition_fields = partition_type.fields().to_owned();
 
         let partition_schema = Schema::builder()
-            .with_schema_id(partition_spec.spec_id)
+            .with_schema_id(partition_spec.spec_id())
             .with_fields(partition_fields)
             .build()?;
 
diff --git a/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs 
b/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs
index 430ebfc..e8e7337 100644
--- a/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs
+++ b/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs
@@ -495,8 +495,8 @@ mod test {
         UnaryExpression,
     };
     use crate::spec::{
-        DataContentType, DataFile, DataFileFormat, Datum, NestedField, 
PartitionField,
-        PartitionSpec, PrimitiveType, Schema, Struct, Transform, Type,
+        DataContentType, DataFile, DataFileFormat, Datum, NestedField, 
PartitionSpec,
+        PrimitiveType, Schema, Struct, Transform, Type, UnboundPartitionField,
     };
 
     const INT_MIN_VALUE: i32 = 30;
@@ -1656,14 +1656,15 @@ mod test {
             .unwrap();
         let table_schema_ref = Arc::new(table_schema);
 
-        let partition_spec = PartitionSpec::builder()
+        let partition_spec = PartitionSpec::builder(&table_schema_ref)
             .with_spec_id(1)
-            .with_fields(vec![PartitionField::builder()
+            .add_unbound_fields(vec![UnboundPartitionField::builder()
                 .source_id(1)
                 .name("a".to_string())
-                .field_id(1)
+                .partition_id(1)
                 .transform(Transform::Identity)
                 .build()])
+            .unwrap()
             .build()
             .unwrap();
         let partition_spec_ref = Arc::new(partition_spec);
diff --git a/crates/iceberg/src/expr/visitors/inclusive_projection.rs 
b/crates/iceberg/src/expr/visitors/inclusive_projection.rs
index 9cfbb4f..716f086 100644
--- a/crates/iceberg/src/expr/visitors/inclusive_projection.rs
+++ b/crates/iceberg/src/expr/visitors/inclusive_projection.rs
@@ -40,7 +40,7 @@ impl InclusiveProjection {
     fn get_parts_for_field_id(&mut self, field_id: i32) -> 
&Vec<PartitionField> {
         if let std::collections::hash_map::Entry::Vacant(e) = 
self.cached_parts.entry(field_id) {
             let mut parts: Vec<PartitionField> = vec![];
-            for partition_spec_field in &self.partition_spec.fields {
+            for partition_spec_field in self.partition_spec.fields() {
                 if partition_spec_field.source_id == field_id {
                     parts.push(partition_spec_field.clone())
                 }
@@ -236,6 +236,7 @@ mod tests {
     use crate::expr::{Bind, Predicate, Reference};
     use crate::spec::{
         Datum, NestedField, PartitionField, PartitionSpec, PrimitiveType, 
Schema, Transform, Type,
+        UnboundPartitionField,
     };
 
     fn build_test_schema() -> Schema {
@@ -265,9 +266,8 @@ mod tests {
     fn test_inclusive_projection_logic_ops() {
         let schema = build_test_schema();
 
-        let partition_spec = PartitionSpec::builder()
+        let partition_spec = PartitionSpec::builder(&schema)
             .with_spec_id(1)
-            .with_fields(vec![])
             .build()
             .unwrap();
 
@@ -296,14 +296,17 @@ mod tests {
     fn test_inclusive_projection_identity_transform() {
         let schema = build_test_schema();
 
-        let partition_spec = PartitionSpec::builder()
+        let partition_spec = PartitionSpec::builder(&schema)
             .with_spec_id(1)
-            .with_fields(vec![PartitionField::builder()
-                .source_id(1)
-                .name("a".to_string())
-                .field_id(1)
-                .transform(Transform::Identity)
-                .build()])
+            .add_unbound_field(
+                UnboundPartitionField::builder()
+                    .source_id(1)
+                    .name("a".to_string())
+                    .partition_id(1)
+                    .transform(Transform::Identity)
+                    .build(),
+            )
+            .unwrap()
             .build()
             .unwrap();
 
@@ -330,30 +333,29 @@ mod tests {
     fn test_inclusive_projection_date_transforms() {
         let schema = build_test_schema();
 
-        let partition_spec = PartitionSpec::builder()
-            .with_spec_id(1)
-            .with_fields(vec![
-                PartitionField::builder()
-                    .source_id(2)
-                    .name("year".to_string())
-                    .field_id(2)
-                    .transform(Transform::Year)
-                    .build(),
-                PartitionField::builder()
-                    .source_id(2)
-                    .name("month".to_string())
-                    .field_id(2)
-                    .transform(Transform::Month)
-                    .build(),
-                PartitionField::builder()
-                    .source_id(2)
-                    .name("day".to_string())
-                    .field_id(2)
-                    .transform(Transform::Day)
-                    .build(),
-            ])
-            .build()
-            .unwrap();
+        let partition_spec = PartitionSpec {
+            spec_id: 1,
+            fields: vec![
+                PartitionField {
+                    source_id: 2,
+                    name: "year".to_string(),
+                    field_id: 1000,
+                    transform: Transform::Year,
+                },
+                PartitionField {
+                    source_id: 2,
+                    name: "month".to_string(),
+                    field_id: 1001,
+                    transform: Transform::Month,
+                },
+                PartitionField {
+                    source_id: 2,
+                    name: "day".to_string(),
+                    field_id: 1002,
+                    transform: Transform::Day,
+                },
+            ],
+        };
 
         let arc_schema = Arc::new(schema);
         let arc_partition_spec = Arc::new(partition_spec);
@@ -378,14 +380,17 @@ mod tests {
     fn test_inclusive_projection_truncate_transform() {
         let schema = build_test_schema();
 
-        let partition_spec = PartitionSpec::builder()
+        let partition_spec = PartitionSpec::builder(&schema)
             .with_spec_id(1)
-            .with_fields(vec![PartitionField::builder()
-                .source_id(3)
-                .name("name".to_string())
-                .field_id(3)
-                .transform(Transform::Truncate(4))
-                .build()])
+            .add_unbound_field(
+                UnboundPartitionField::builder()
+                    .source_id(3)
+                    .name("name_truncate".to_string())
+                    .partition_id(3)
+                    .transform(Transform::Truncate(4))
+                    .build(),
+            )
+            .unwrap()
             .build()
             .unwrap();
 
@@ -398,7 +403,7 @@ mod tests {
 
         // applying InclusiveProjection to bound_predicate
         // should result in the 'name STARTS WITH "Testy McTest"'
-        // predicate being transformed to 'name STARTS WITH "Test"',
+        // predicate being transformed to 'name_truncate STARTS WITH "Test"',
         // since a `Truncate(4)` partition will map values of
         // name that start with "Testy McTest" into a partition
         // for values of name that start with the first four letters
@@ -406,7 +411,7 @@ mod tests {
         let mut inclusive_projection = 
InclusiveProjection::new(arc_partition_spec.clone());
         let result = inclusive_projection.project(&bound_predicate).unwrap();
 
-        let expected = "name STARTS WITH \"Test\"".to_string();
+        let expected = "name_truncate STARTS WITH \"Test\"".to_string();
 
         assert_eq!(result.to_string(), expected)
     }
@@ -415,14 +420,17 @@ mod tests {
     fn test_inclusive_projection_bucket_transform() {
         let schema = build_test_schema();
 
-        let partition_spec = PartitionSpec::builder()
+        let partition_spec = PartitionSpec::builder(&schema)
             .with_spec_id(1)
-            .with_fields(vec![PartitionField::builder()
-                .source_id(1)
-                .name("a".to_string())
-                .field_id(1)
-                .transform(Transform::Bucket(7))
-                .build()])
+            .add_unbound_field(
+                UnboundPartitionField::builder()
+                    .source_id(1)
+                    .name("a_bucket[7]".to_string())
+                    .partition_id(1)
+                    .transform(Transform::Bucket(7))
+                    .build(),
+            )
+            .unwrap()
             .build()
             .unwrap();
 
@@ -440,7 +448,7 @@ mod tests {
         let mut inclusive_projection = 
InclusiveProjection::new(arc_partition_spec.clone());
         let result = inclusive_projection.project(&bound_predicate).unwrap();
 
-        let expected = "a = 2".to_string();
+        let expected = "a_bucket[7] = 2".to_string();
 
         assert_eq!(result.to_string(), expected)
     }
diff --git a/crates/iceberg/src/spec/manifest.rs 
b/crates/iceberg/src/spec/manifest.rs
index e2f8251..14b8a80 100644
--- a/crates/iceberg/src/spec/manifest.rs
+++ b/crates/iceberg/src/spec/manifest.rs
@@ -227,14 +227,14 @@ impl ManifestWriter {
         )?;
         avro_writer.add_user_metadata(
             "partition-spec".to_string(),
-            to_vec(&manifest.metadata.partition_spec.fields).map_err(|err| {
+            to_vec(&manifest.metadata.partition_spec.fields()).map_err(|err| {
                 Error::new(ErrorKind::DataInvalid, "Fail to serialize 
partition spec")
                     .with_source(err)
             })?,
         )?;
         avro_writer.add_user_metadata(
             "partition-spec-id".to_string(),
-            manifest.metadata.partition_spec.spec_id.to_string(),
+            manifest.metadata.partition_spec.spec_id().to_string(),
         )?;
         avro_writer.add_user_metadata(
             "format-version".to_string(),
@@ -300,12 +300,12 @@ impl ManifestWriter {
         self.output.write(Bytes::from(content)).await?;
 
         let partition_summary =
-            
self.get_field_summary_vec(&manifest.metadata.partition_spec.fields);
+            
self.get_field_summary_vec(manifest.metadata.partition_spec.fields());
 
         Ok(ManifestFile {
             manifest_path: self.output.location().to_string(),
             manifest_length: length as i64,
-            partition_spec_id: manifest.metadata.partition_spec.spec_id,
+            partition_spec_id: manifest.metadata.partition_spec.spec_id(),
             content: manifest.metadata.content,
             // sequence_number and min_sequence_number with 
UNASSIGNED_SEQUENCE_NUMBER will be replace with
             // real sequence number in `ManifestListWriter`.
diff --git a/crates/iceberg/src/spec/partition.rs 
b/crates/iceberg/src/spec/partition.rs
index f1244e4..055b48e 100644
--- a/crates/iceberg/src/spec/partition.rs
+++ b/crates/iceberg/src/spec/partition.rs
@@ -25,7 +25,10 @@ use typed_builder::TypedBuilder;
 
 use super::transform::Transform;
 use super::{NestedField, Schema, StructType};
-use crate::{Error, ErrorKind};
+use crate::{Error, ErrorKind, Result};
+
+pub(crate) const UNPARTITIONED_LAST_ASSIGNED_ID: i32 = 999;
+pub(crate) const DEFAULT_PARTITION_SPEC_ID: i32 = 0;
 
 /// Reference to [`PartitionSpec`].
 pub type PartitionSpecRef = Arc<PartitionSpec>;
@@ -44,22 +47,37 @@ pub struct PartitionField {
     pub transform: Transform,
 }
 
+impl PartitionField {
+    /// To unbound partition field
+    pub fn into_unbound(self) -> UnboundPartitionField {
+        self.into()
+    }
+}
+
 ///  Partition spec that defines how to produce a tuple of partition values 
from a record.
-#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default, 
Builder)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default)]
 #[serde(rename_all = "kebab-case")]
-#[builder(setter(prefix = "with"))]
 pub struct PartitionSpec {
     /// Identifier for PartitionSpec
-    pub spec_id: i32,
+    pub(crate) spec_id: i32,
     /// Details of the partition spec
-    #[builder(setter(each(name = "with_partition_field")))]
-    pub fields: Vec<PartitionField>,
+    pub(crate) fields: Vec<PartitionField>,
 }
 
 impl PartitionSpec {
     /// Create partition spec builer
-    pub fn builder() -> PartitionSpecBuilder {
-        PartitionSpecBuilder::default()
+    pub fn builder(schema: &Schema) -> PartitionSpecBuilder {
+        PartitionSpecBuilder::new(schema)
+    }
+
+    /// Spec id of the partition spec
+    pub fn spec_id(&self) -> i32 {
+        self.spec_id
+    }
+
+    /// Fields of the partition spec
+    pub fn fields(&self) -> &[PartitionField] {
+        &self.fields
     }
 
     /// Returns if the partition spec is unpartitioned.
@@ -74,7 +92,7 @@ impl PartitionSpec {
     }
 
     /// Returns the partition type of this partition spec.
-    pub fn partition_type(&self, schema: &Schema) -> Result<StructType, Error> 
{
+    pub fn partition_type(&self, schema: &Schema) -> Result<StructType> {
         let mut fields = Vec::with_capacity(self.fields.len());
         for partition_field in &self.fields {
             let field = schema
@@ -96,6 +114,13 @@ impl PartitionSpec {
         }
         Ok(StructType::new(fields))
     }
+
+    /// Turn this partition spec into an unbound partition spec.
+    ///
+    /// The `field_id` is retained as `partition_id` in the unbound partition 
spec.
+    pub fn to_unbound(self) -> UnboundPartitionSpec {
+        self.into()
+    }
 }
 
 /// Reference to [`UnboundPartitionSpec`].
@@ -117,16 +142,13 @@ pub struct UnboundPartitionField {
 }
 
 /// Unbound partition spec can be built without a schema and later bound to a 
schema.
-#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default, 
Builder)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default)]
 #[serde(rename_all = "kebab-case")]
-#[builder(setter(prefix = "with"))]
 pub struct UnboundPartitionSpec {
     /// Identifier for PartitionSpec
-    #[builder(default, setter(strip_option))]
-    pub spec_id: Option<i32>,
+    pub(crate) spec_id: Option<i32>,
     /// Details of the partition spec
-    #[builder(setter(each(name = "with_unbound_partition_field")))]
-    pub fields: Vec<UnboundPartitionField>,
+    pub(crate) fields: Vec<UnboundPartitionField>,
 }
 
 impl UnboundPartitionSpec {
@@ -134,6 +156,424 @@ impl UnboundPartitionSpec {
     pub fn builder() -> UnboundPartitionSpecBuilder {
         UnboundPartitionSpecBuilder::default()
     }
+
+    /// Bind this unbound partition spec to a schema.
+    pub fn bind(self, schema: &Schema) -> Result<PartitionSpec> {
+        PartitionSpecBuilder::new_from_unbound(self, schema)?.build()
+    }
+
+    /// Spec id of the partition spec
+    pub fn spec_id(&self) -> Option<i32> {
+        self.spec_id
+    }
+
+    /// Fields of the partition spec
+    pub fn fields(&self) -> &[UnboundPartitionField] {
+        &self.fields
+    }
+}
+
+impl From<PartitionField> for UnboundPartitionField {
+    fn from(field: PartitionField) -> Self {
+        UnboundPartitionField {
+            source_id: field.source_id,
+            partition_id: Some(field.field_id),
+            name: field.name,
+            transform: field.transform,
+        }
+    }
+}
+
+impl From<PartitionSpec> for UnboundPartitionSpec {
+    fn from(spec: PartitionSpec) -> Self {
+        UnboundPartitionSpec {
+            spec_id: Some(spec.spec_id),
+            fields: spec.fields.into_iter().map(Into::into).collect(),
+        }
+    }
+}
+
+/// Create a new UnboundPartitionSpec
+#[derive(Debug, Default)]
+pub struct UnboundPartitionSpecBuilder {
+    spec_id: Option<i32>,
+    fields: Vec<UnboundPartitionField>,
+}
+
+impl UnboundPartitionSpecBuilder {
+    /// Create a new partition spec builder with the given schema.
+    pub fn new() -> Self {
+        Self {
+            spec_id: None,
+            fields: vec![],
+        }
+    }
+
+    /// Set the spec id for the partition spec.
+    pub fn with_spec_id(mut self, spec_id: i32) -> Self {
+        self.spec_id = Some(spec_id);
+        self
+    }
+
+    /// Add a new partition field to the partition spec from an unbound 
partition field.
+    pub fn add_partition_field(
+        self,
+        source_id: i32,
+        target_name: impl ToString,
+        transformation: Transform,
+    ) -> Result<Self> {
+        let field = UnboundPartitionField {
+            source_id,
+            partition_id: None,
+            name: target_name.to_string(),
+            transform: transformation,
+        };
+        self.add_partition_field_internal(field)
+    }
+
+    /// Add multiple partition fields to the partition spec.
+    pub fn add_partition_fields(
+        self,
+        fields: impl IntoIterator<Item = UnboundPartitionField>,
+    ) -> Result<Self> {
+        let mut builder = self;
+        for field in fields {
+            builder = builder.add_partition_field_internal(field)?;
+        }
+        Ok(builder)
+    }
+
+    fn add_partition_field_internal(mut self, field: UnboundPartitionField) -> 
Result<Self> {
+        self.check_name_set_and_unique(&field.name)?;
+        self.check_for_redundant_partitions(field.source_id, 
&field.transform)?;
+        if let Some(partition_id) = field.partition_id {
+            self.check_partition_id_unique(partition_id)?;
+        }
+        self.fields.push(field);
+        Ok(self)
+    }
+
+    /// Build the unbound partition spec.
+    pub fn build(self) -> UnboundPartitionSpec {
+        UnboundPartitionSpec {
+            spec_id: self.spec_id,
+            fields: self.fields,
+        }
+    }
+}
+
+/// Create valid partition specs for a given schema.
+#[derive(Debug)]
+pub struct PartitionSpecBuilder<'a> {
+    spec_id: Option<i32>,
+    last_assigned_field_id: i32,
+    fields: Vec<UnboundPartitionField>,
+    schema: &'a Schema,
+}
+
+impl<'a> PartitionSpecBuilder<'a> {
+    /// Create a new partition spec builder with the given schema.
+    pub fn new(schema: &'a Schema) -> Self {
+        Self {
+            spec_id: None,
+            fields: vec![],
+            last_assigned_field_id: UNPARTITIONED_LAST_ASSIGNED_ID,
+            schema,
+        }
+    }
+
+    /// Create a new partition spec builder from an existing unbound partition 
spec.
+    pub fn new_from_unbound(unbound: UnboundPartitionSpec, schema: &'a Schema) 
-> Result<Self> {
+        let mut builder =
+            
Self::new(schema).with_spec_id(unbound.spec_id.unwrap_or(DEFAULT_PARTITION_SPEC_ID));
+
+        for field in unbound.fields {
+            builder = builder.add_unbound_field(field)?;
+        }
+        Ok(builder)
+    }
+
+    /// Set the last assigned field id for the partition spec.
+    ///
+    /// Set this field when a new partition spec is created for an existing 
TableMetaData.
+    /// As `field_id` must be unique in V2 metadata, this should be set to
+    /// the highest field id used previously.
+    pub fn with_last_assigned_field_id(mut self, last_assigned_field_id: i32) 
-> Self {
+        self.last_assigned_field_id = last_assigned_field_id;
+        self
+    }
+
+    /// Set the spec id for the partition spec.
+    pub fn with_spec_id(mut self, spec_id: i32) -> Self {
+        self.spec_id = Some(spec_id);
+        self
+    }
+
+    /// Add a new partition field to the partition spec.
+    pub fn add_partition_field(
+        self,
+        source_name: impl AsRef<str>,
+        target_name: impl Into<String>,
+        transform: Transform,
+    ) -> Result<Self> {
+        let source_id = self
+            .schema
+            .field_by_name(source_name.as_ref())
+            .ok_or_else(|| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    format!(
+                        "Cannot find source column with name: {} in schema",
+                        source_name.as_ref()
+                    ),
+                )
+            })?
+            .id;
+        let field = UnboundPartitionField {
+            source_id,
+            partition_id: None,
+            name: target_name.into(),
+            transform,
+        };
+
+        self.add_unbound_field(field)
+    }
+
+    /// Add a new partition field to the partition spec.
+    ///
+    /// If `partition_id` is set, it is used as the field id.
+    /// Otherwise, a new `field_id` is assigned.
+    pub fn add_unbound_field(mut self, field: UnboundPartitionField) -> 
Result<Self> {
+        self.check_name_set_and_unique(&field.name)?;
+        self.check_for_redundant_partitions(field.source_id, 
&field.transform)?;
+        Self::check_name_does_not_collide_with_schema(&field, self.schema)?;
+        Self::check_transform_compatibility(&field, self.schema)?;
+        if let Some(partition_id) = field.partition_id {
+            self.check_partition_id_unique(partition_id)?;
+        }
+
+        // Non-fallible from here
+        self.fields.push(field);
+        Ok(self)
+    }
+
+    /// Wrapper around `with_unbound_fields` to add multiple partition fields.
+    pub fn add_unbound_fields(
+        self,
+        fields: impl IntoIterator<Item = UnboundPartitionField>,
+    ) -> Result<Self> {
+        let mut builder = self;
+        for field in fields {
+            builder = builder.add_unbound_field(field)?;
+        }
+        Ok(builder)
+    }
+
+    /// Build a bound partition spec with the given schema.
+    pub fn build(self) -> Result<PartitionSpec> {
+        let fields = Self::set_field_ids(self.fields, 
self.last_assigned_field_id)?;
+        Ok(PartitionSpec {
+            spec_id: self.spec_id.unwrap_or(DEFAULT_PARTITION_SPEC_ID),
+            fields,
+        })
+    }
+
+    fn set_field_ids(
+        fields: Vec<UnboundPartitionField>,
+        last_assigned_field_id: i32,
+    ) -> Result<Vec<PartitionField>> {
+        let mut last_assigned_field_id = last_assigned_field_id;
+        // Already assigned partition ids. If we see one of these during 
iteration,
+        // we skip it.
+        let assigned_ids = fields
+            .iter()
+            .filter_map(|f| f.partition_id)
+            .collect::<std::collections::HashSet<_>>();
+
+        fn _check_add_1(prev: i32) -> Result<i32> {
+            prev.checked_add(1).ok_or_else(|| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    "Cannot assign more partition ids. Overflow.",
+                )
+            })
+        }
+
+        let mut bound_fields = Vec::with_capacity(fields.len());
+        for field in fields.into_iter() {
+            let partition_id = if let Some(partition_id) = field.partition_id {
+                last_assigned_field_id = std::cmp::max(last_assigned_field_id, 
partition_id);
+                partition_id
+            } else {
+                last_assigned_field_id = _check_add_1(last_assigned_field_id)?;
+                while assigned_ids.contains(&last_assigned_field_id) {
+                    last_assigned_field_id = 
_check_add_1(last_assigned_field_id)?;
+                }
+                last_assigned_field_id
+            };
+
+            bound_fields.push(PartitionField {
+                source_id: field.source_id,
+                field_id: partition_id,
+                name: field.name,
+                transform: field.transform,
+            })
+        }
+
+        Ok(bound_fields)
+    }
+
+    /// Ensure that the partition name is unique among columns in the schema.
+    /// Duplicate names are allowed if:
+    /// 1. The column is sourced from the column with the same name.
+    /// 2. AND the transformation is identity
+    fn check_name_does_not_collide_with_schema(
+        field: &UnboundPartitionField,
+        schema: &Schema,
+    ) -> Result<()> {
+        match schema.field_by_name(field.name.as_str()) {
+            Some(schema_collision) => {
+                if field.transform == Transform::Identity {
+                    if schema_collision.id == field.source_id {
+                        Ok(())
+                    } else {
+                        Err(Error::new(
+                            ErrorKind::DataInvalid,
+                            format!(
+                                "Cannot create identity partition sourced from 
different field in schema. Field name '{}' has id `{}` in schema but partition 
source id is `{}`",
+                                field.name, schema_collision.id, 
field.source_id
+                            ),
+                        ))
+                    }
+                } else {
+                    Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        format!(
+                            "Cannot create partition with name: '{}' that 
conflicts with schema field and is not an identity transform.",
+                            field.name
+                        ),
+                    ))
+                }
+            }
+            None => Ok(()),
+        }
+    }
+
+    /// Ensure that the transformation of the field is compatible with type of 
the field
+    /// in the schema. Implicitly also checks if the source field exists in 
the schema.
+    fn check_transform_compatibility(field: &UnboundPartitionField, schema: 
&Schema) -> Result<()> {
+        let schema_field = schema.field_by_id(field.source_id).ok_or_else(|| {
+            Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Cannot find partition source field with id `{}` in 
schema",
+                    field.source_id
+                ),
+            )
+        })?;
+
+        if field.transform != Transform::Void {
+            if !schema_field.field_type.is_primitive() {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    format!(
+                        "Cannot partition by non-primitive source field: 
'{}'.",
+                        schema_field.field_type
+                    ),
+                ));
+            }
+
+            if field
+                .transform
+                .result_type(&schema_field.field_type)
+                .is_err()
+            {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    format!(
+                        "Invalid source type: '{}' for transform: '{}'.",
+                        schema_field.field_type,
+                        field.transform.dedup_name()
+                    ),
+                ));
+            }
+        }
+
+        Ok(())
+    }
+}
+
+/// Contains checks that are common to both PartitionSpecBuilder and 
UnboundPartitionSpecBuilder
+trait CorePartitionSpecValidator {
+    /// Ensure that the partition name is unique among the partition fields 
and is not empty.
+    fn check_name_set_and_unique(&self, name: &str) -> Result<()> {
+        if name.is_empty() {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                "Cannot use empty partition name",
+            ));
+        }
+
+        if self.fields().iter().any(|f| f.name == name) {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!("Cannot use partition name more than once: {}", name),
+            ));
+        }
+        Ok(())
+    }
+
+    /// For a single source-column transformations must be unique.
+    fn check_for_redundant_partitions(&self, source_id: i32, transform: 
&Transform) -> Result<()> {
+        let collision = self.fields().iter().find(|f| {
+            f.source_id == source_id && f.transform.dedup_name() == 
transform.dedup_name()
+        });
+
+        if let Some(collision) = collision {
+            Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    format!(
+                        "Cannot add redundant partition with source id `{}` 
and transform `{}`. A partition with the same source id and transform already 
exists with name `{}`",
+                        source_id, transform.dedup_name(), collision.name
+                    ),
+                ))
+        } else {
+            Ok(())
+        }
+    }
+
+    /// Check field / partition_id unique within the partition spec if set
+    fn check_partition_id_unique(&self, field_id: i32) -> Result<()> {
+        if self
+            .fields()
+            .iter()
+            .any(|f| f.partition_id == Some(field_id))
+        {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Cannot use field id more than once in one PartitionSpec: 
{}",
+                    field_id
+                ),
+            ));
+        }
+
+        Ok(())
+    }
+
+    fn fields(&self) -> &Vec<UnboundPartitionField>;
+}
+
+impl CorePartitionSpecValidator for PartitionSpecBuilder<'_> {
+    fn fields(&self) -> &Vec<UnboundPartitionField> {
+        &self.fields
+    }
+}
+
+impl CorePartitionSpecValidator for UnboundPartitionSpecBuilder {
+    fn fields(&self) -> &Vec<UnboundPartitionField> {
+        &self.fields
+    }
 }
 
 #[cfg(test)]
@@ -184,9 +624,21 @@ mod tests {
 
     #[test]
     fn test_is_unpartitioned() {
-        let partition_spec = PartitionSpec::builder()
+        let schema = Schema::builder()
+            .with_fields(vec![
+                NestedField::required(1, "id", 
Type::Primitive(crate::spec::PrimitiveType::Int))
+                    .into(),
+                NestedField::required(
+                    2,
+                    "name",
+                    Type::Primitive(crate::spec::PrimitiveType::String),
+                )
+                .into(),
+            ])
+            .build()
+            .unwrap();
+        let partition_spec = PartitionSpec::builder(&schema)
             .with_spec_id(1)
-            .with_fields(vec![])
             .build()
             .unwrap();
         assert!(
@@ -194,23 +646,20 @@ mod tests {
             "Empty partition spec should be unpartitioned"
         );
 
-        let partition_spec = PartitionSpec::builder()
-            .with_partition_field(
-                PartitionField::builder()
+        let partition_spec = PartitionSpec::builder(&schema)
+            .add_unbound_fields(vec![
+                UnboundPartitionField::builder()
                     .source_id(1)
-                    .field_id(1)
                     .name("id".to_string())
                     .transform(Transform::Identity)
                     .build(),
-            )
-            .with_partition_field(
-                PartitionField::builder()
+                UnboundPartitionField::builder()
                     .source_id(2)
-                    .field_id(2)
-                    .name("name".to_string())
+                    .name("name_string".to_string())
                     .transform(Transform::Void)
                     .build(),
-            )
+            ])
+            .unwrap()
             .with_spec_id(1)
             .build()
             .unwrap();
@@ -219,24 +668,21 @@ mod tests {
             "Partition spec with one non void transform should not be 
unpartitioned"
         );
 
-        let partition_spec = PartitionSpec::builder()
+        let partition_spec = PartitionSpec::builder(&schema)
             .with_spec_id(1)
-            .with_partition_field(
-                PartitionField::builder()
+            .add_unbound_fields(vec![
+                UnboundPartitionField::builder()
                     .source_id(1)
-                    .field_id(1)
-                    .name("id".to_string())
+                    .name("id_void".to_string())
                     .transform(Transform::Void)
                     .build(),
-            )
-            .with_partition_field(
-                PartitionField::builder()
+                UnboundPartitionField::builder()
                     .source_id(2)
-                    .field_id(2)
-                    .name("name".to_string())
+                    .name("name_void".to_string())
                     .transform(Transform::Void)
                     .build(),
-            )
+            ])
+            .unwrap()
             .build()
             .unwrap();
         assert!(
@@ -489,4 +935,336 @@ mod tests {
 
         assert!(partition_spec.partition_type(&schema).is_err());
     }
+
+    #[test]
+    fn test_builder_disallow_duplicate_names() {
+        UnboundPartitionSpec::builder()
+            .add_partition_field(1, "ts_day".to_string(), Transform::Day)
+            .unwrap()
+            .add_partition_field(2, "ts_day".to_string(), Transform::Day)
+            .unwrap_err();
+    }
+
+    #[test]
+    fn test_builder_disallow_duplicate_field_ids() {
+        let schema = Schema::builder()
+            .with_fields(vec![
+                NestedField::required(1, "id", 
Type::Primitive(crate::spec::PrimitiveType::Int))
+                    .into(),
+                NestedField::required(
+                    2,
+                    "name",
+                    Type::Primitive(crate::spec::PrimitiveType::String),
+                )
+                .into(),
+            ])
+            .build()
+            .unwrap();
+        PartitionSpec::builder(&schema)
+            .add_unbound_field(UnboundPartitionField {
+                source_id: 1,
+                partition_id: Some(1000),
+                name: "id".to_string(),
+                transform: Transform::Identity,
+            })
+            .unwrap()
+            .add_unbound_field(UnboundPartitionField {
+                source_id: 2,
+                partition_id: Some(1000),
+                name: "id_bucket".to_string(),
+                transform: Transform::Bucket(16),
+            })
+            .unwrap_err();
+    }
+
+    #[test]
+    fn test_builder_auto_assign_field_ids() {
+        let schema = Schema::builder()
+            .with_fields(vec![
+                NestedField::required(1, "id", 
Type::Primitive(crate::spec::PrimitiveType::Int))
+                    .into(),
+                NestedField::required(
+                    2,
+                    "name",
+                    Type::Primitive(crate::spec::PrimitiveType::String),
+                )
+                .into(),
+                NestedField::required(
+                    3,
+                    "ts",
+                    Type::Primitive(crate::spec::PrimitiveType::Timestamp),
+                )
+                .into(),
+            ])
+            .build()
+            .unwrap();
+        let spec = PartitionSpec::builder(&schema)
+            .with_spec_id(1)
+            .add_unbound_field(UnboundPartitionField {
+                source_id: 1,
+                name: "id".to_string(),
+                transform: Transform::Identity,
+                partition_id: Some(1012),
+            })
+            .unwrap()
+            .add_unbound_field(UnboundPartitionField {
+                source_id: 2,
+                name: "name_void".to_string(),
+                transform: Transform::Void,
+                partition_id: None,
+            })
+            .unwrap()
+            // Should keep its ID even if its lower
+            .add_unbound_field(UnboundPartitionField {
+                source_id: 3,
+                name: "year".to_string(),
+                transform: Transform::Year,
+                partition_id: Some(1),
+            })
+            .unwrap()
+            .build()
+            .unwrap();
+
+        assert_eq!(1012, spec.fields[0].field_id);
+        assert_eq!(1013, spec.fields[1].field_id);
+        assert_eq!(1, spec.fields[2].field_id);
+    }
+
+    #[test]
+    fn test_builder_valid_schema() {
+        let schema = Schema::builder()
+            .with_fields(vec![
+                NestedField::required(1, "id", 
Type::Primitive(crate::spec::PrimitiveType::Int))
+                    .into(),
+                NestedField::required(
+                    2,
+                    "name",
+                    Type::Primitive(crate::spec::PrimitiveType::String),
+                )
+                .into(),
+            ])
+            .build()
+            .unwrap();
+
+        PartitionSpec::builder(&schema)
+            .with_spec_id(1)
+            .build()
+            .unwrap();
+
+        let spec = PartitionSpec::builder(&schema)
+            .with_spec_id(1)
+            .add_partition_field("id", "id_bucket[16]", Transform::Bucket(16))
+            .unwrap()
+            .build()
+            .unwrap();
+
+        assert_eq!(spec, PartitionSpec {
+            spec_id: 1,
+            fields: vec![PartitionField {
+                source_id: 1,
+                field_id: 1000,
+                name: "id_bucket[16]".to_string(),
+                transform: Transform::Bucket(16),
+            }]
+        });
+    }
+
+    #[test]
+    fn test_collision_with_schema_name() {
+        let schema = Schema::builder()
+            .with_fields(vec![NestedField::required(
+                1,
+                "id",
+                Type::Primitive(crate::spec::PrimitiveType::Int),
+            )
+            .into()])
+            .build()
+            .unwrap();
+
+        PartitionSpec::builder(&schema)
+            .with_spec_id(1)
+            .build()
+            .unwrap();
+
+        let err = PartitionSpec::builder(&schema)
+            .with_spec_id(1)
+            .add_unbound_field(UnboundPartitionField {
+                source_id: 1,
+                partition_id: None,
+                name: "id".to_string(),
+                transform: Transform::Bucket(16),
+            })
+            .unwrap_err();
+        assert!(err.message().contains("conflicts with schema"))
+    }
+
+    #[test]
+    fn test_builder_collision_is_ok_for_identity_transforms() {
+        let schema = Schema::builder()
+            .with_fields(vec![
+                NestedField::required(1, "id", 
Type::Primitive(crate::spec::PrimitiveType::Int))
+                    .into(),
+                NestedField::required(
+                    2,
+                    "number",
+                    Type::Primitive(crate::spec::PrimitiveType::Int),
+                )
+                .into(),
+            ])
+            .build()
+            .unwrap();
+
+        PartitionSpec::builder(&schema)
+            .with_spec_id(1)
+            .build()
+            .unwrap();
+
+        PartitionSpec::builder(&schema)
+            .with_spec_id(1)
+            .add_unbound_field(UnboundPartitionField {
+                source_id: 1,
+                partition_id: None,
+                name: "id".to_string(),
+                transform: Transform::Identity,
+            })
+            .unwrap()
+            .build()
+            .unwrap();
+
+        // Not OK for different source id
+        PartitionSpec::builder(&schema)
+            .with_spec_id(1)
+            .add_unbound_field(UnboundPartitionField {
+                source_id: 2,
+                partition_id: None,
+                name: "id".to_string(),
+                transform: Transform::Identity,
+            })
+            .unwrap_err();
+    }
+
+    #[test]
+    fn test_builder_all_source_ids_must_exist() {
+        let schema = Schema::builder()
+            .with_fields(vec![
+                NestedField::required(1, "id", 
Type::Primitive(crate::spec::PrimitiveType::Int))
+                    .into(),
+                NestedField::required(
+                    2,
+                    "name",
+                    Type::Primitive(crate::spec::PrimitiveType::String),
+                )
+                .into(),
+                NestedField::required(
+                    3,
+                    "ts",
+                    Type::Primitive(crate::spec::PrimitiveType::Timestamp),
+                )
+                .into(),
+            ])
+            .build()
+            .unwrap();
+
+        // Valid
+        PartitionSpec::builder(&schema)
+            .with_spec_id(1)
+            .add_unbound_fields(vec![
+                UnboundPartitionField {
+                    source_id: 1,
+                    partition_id: None,
+                    name: "id_bucket".to_string(),
+                    transform: Transform::Bucket(16),
+                },
+                UnboundPartitionField {
+                    source_id: 2,
+                    partition_id: None,
+                    name: "name".to_string(),
+                    transform: Transform::Identity,
+                },
+            ])
+            .unwrap()
+            .build()
+            .unwrap();
+
+        // Invalid
+        PartitionSpec::builder(&schema)
+            .with_spec_id(1)
+            .add_unbound_fields(vec![
+                UnboundPartitionField {
+                    source_id: 1,
+                    partition_id: None,
+                    name: "id_bucket".to_string(),
+                    transform: Transform::Bucket(16),
+                },
+                UnboundPartitionField {
+                    source_id: 4,
+                    partition_id: None,
+                    name: "name".to_string(),
+                    transform: Transform::Identity,
+                },
+            ])
+            .unwrap_err();
+    }
+
+    #[test]
+    fn test_builder_disallows_redundant() {
+        let err = UnboundPartitionSpec::builder()
+            .with_spec_id(1)
+            .add_partition_field(1, "id_bucket[16]".to_string(), 
Transform::Bucket(16))
+            .unwrap()
+            .add_partition_field(
+                1,
+                "id_bucket_with_other_name".to_string(),
+                Transform::Bucket(16),
+            )
+            .unwrap_err();
+        assert!(err.message().contains("redundant partition"));
+    }
+
+    #[test]
+    fn test_builder_incompatible_transforms_disallowed() {
+        let schema = Schema::builder()
+            .with_fields(vec![NestedField::required(
+                1,
+                "id",
+                Type::Primitive(crate::spec::PrimitiveType::Int),
+            )
+            .into()])
+            .build()
+            .unwrap();
+
+        PartitionSpec::builder(&schema)
+            .with_spec_id(1)
+            .add_unbound_field(UnboundPartitionField {
+                source_id: 1,
+                partition_id: None,
+                name: "id_year".to_string(),
+                transform: Transform::Year,
+            })
+            .unwrap_err();
+    }
+
+    #[test]
+    fn test_build_unbound_specs_without_partition_id() {
+        let spec = UnboundPartitionSpec::builder()
+            .with_spec_id(1)
+            .add_partition_fields(vec![UnboundPartitionField {
+                source_id: 1,
+                partition_id: None,
+                name: "id_bucket[16]".to_string(),
+                transform: Transform::Bucket(16),
+            }])
+            .unwrap()
+            .build();
+
+        assert_eq!(spec, UnboundPartitionSpec {
+            spec_id: Some(1),
+            fields: vec![UnboundPartitionField {
+                source_id: 1,
+                partition_id: None,
+                name: "id_bucket[16]".to_string(),
+                transform: Transform::Bucket(16),
+            }]
+        });
+    }
 }
diff --git a/crates/iceberg/src/spec/table_metadata.rs 
b/crates/iceberg/src/spec/table_metadata.rs
index 53bcabb..cd7f046 100644
--- a/crates/iceberg/src/spec/table_metadata.rs
+++ b/crates/iceberg/src/spec/table_metadata.rs
@@ -32,12 +32,12 @@ use uuid::Uuid;
 use super::snapshot::{Snapshot, SnapshotReference, SnapshotRetention};
 use super::{
     PartitionSpec, PartitionSpecRef, SchemaId, SchemaRef, SnapshotRef, 
SortOrder, SortOrderRef,
+    DEFAULT_PARTITION_SPEC_ID,
 };
 use crate::error::{timestamp_ms_to_utc, Result};
 use crate::{Error, ErrorKind, TableCreation};
 
 static MAIN_BRANCH: &str = "main";
-static DEFAULT_SPEC_ID: i32 = 0;
 static DEFAULT_SORT_ORDER_ID: i64 = 0;
 
 pub(crate) static EMPTY_SNAPSHOT_ID: i64 = -1;
@@ -187,8 +187,8 @@ impl TableMetadata {
     /// Get default partition spec
     #[inline]
     pub fn default_partition_spec(&self) -> Option<&PartitionSpecRef> {
-        if self.default_spec_id == DEFAULT_SPEC_ID {
-            self.partition_spec_by_id(DEFAULT_SPEC_ID)
+        if self.default_spec_id == DEFAULT_PARTITION_SPEC_ID {
+            self.partition_spec_by_id(DEFAULT_PARTITION_SPEC_ID)
         } else {
             Some(
                 self.partition_spec_by_id(self.default_spec_id)
@@ -308,9 +308,9 @@ impl TableMetadataBuilder {
                 ))
             }
             None => HashMap::from([(
-                DEFAULT_SPEC_ID,
+                DEFAULT_PARTITION_SPEC_ID,
                 Arc::new(PartitionSpec {
-                    spec_id: DEFAULT_SPEC_ID,
+                    spec_id: DEFAULT_PARTITION_SPEC_ID,
                     fields: vec![],
                 }),
             )]),
@@ -347,7 +347,7 @@ impl TableMetadataBuilder {
             current_schema_id: schema.schema_id(),
             schemas: HashMap::from([(schema.schema_id(), Arc::new(schema))]),
             partition_specs,
-            default_spec_id: DEFAULT_SPEC_ID,
+            default_spec_id: DEFAULT_PARTITION_SPEC_ID,
             last_partition_id: 0,
             properties,
             current_snapshot_id: None,
@@ -391,8 +391,8 @@ pub(super) mod _serde {
     use uuid::Uuid;
 
     use super::{
-        FormatVersion, MetadataLog, SnapshotLog, TableMetadata, 
DEFAULT_SORT_ORDER_ID,
-        DEFAULT_SPEC_ID, MAIN_BRANCH,
+        FormatVersion, MetadataLog, SnapshotLog, TableMetadata, 
DEFAULT_PARTITION_SPEC_ID,
+        DEFAULT_SORT_ORDER_ID, MAIN_BRANCH,
     };
     use crate::spec::schema::_serde::{SchemaV1, SchemaV2};
     use crate::spec::snapshot::_serde::{SnapshotV1, SnapshotV2};
@@ -568,7 +568,7 @@ pub(super) mod _serde {
                     value
                         .partition_specs
                         .into_iter()
-                        .map(|x| (x.spec_id, Arc::new(x))),
+                        .map(|x| (x.spec_id(), Arc::new(x))),
                 ),
                 default_spec_id: value.default_spec_id,
                 last_partition_id: value.last_partition_id,
@@ -643,12 +643,12 @@ pub(super) mod _serde {
                     .partition_specs
                     .unwrap_or_else(|| {
                         vec![PartitionSpec {
-                            spec_id: DEFAULT_SPEC_ID,
+                            spec_id: DEFAULT_PARTITION_SPEC_ID,
                             fields: value.partition_spec,
                         }]
                     })
                     .into_iter()
-                    .map(|x| (x.spec_id, Arc::new(x))),
+                    .map(|x| (x.spec_id(), Arc::new(x))),
             );
             Ok(TableMetadata {
                 format_version: FormatVersion::V1,
@@ -808,7 +808,7 @@ pub(super) mod _serde {
                 partition_spec: v
                     .partition_specs
                     .get(&v.default_spec_id)
-                    .map(|x| x.fields.clone())
+                    .map(|x| x.fields().to_vec())
                     .unwrap_or_default(),
                 partition_specs: Some(
                     v.partition_specs
@@ -935,7 +935,7 @@ mod tests {
     use crate::spec::{
         NestedField, NullOrder, Operation, PartitionField, PartitionSpec, 
PrimitiveType, Schema,
         Snapshot, SnapshotReference, SnapshotRetention, SortDirection, 
SortField, SortOrder,
-        Summary, Transform, Type,
+        Summary, Transform, Type, UnboundPartitionField,
     };
     use crate::TableCreation;
 
@@ -1020,16 +1020,15 @@ mod tests {
             .build()
             .unwrap();
 
-        let partition_spec = PartitionSpec::builder()
-            .with_spec_id(1)
-            .with_partition_field(PartitionField {
+        let partition_spec = PartitionSpec {
+            spec_id: 1,
+            fields: vec![PartitionField {
                 name: "ts_day".to_string(),
                 transform: Transform::Day,
                 source_id: 4,
                 field_id: 1000,
-            })
-            .build()
-            .unwrap();
+            }],
+        };
 
         let expected = TableMetadata {
             format_version: FormatVersion::V2,
@@ -1179,14 +1178,10 @@ mod tests {
             .build()
             .unwrap();
 
-        let partition_spec = PartitionSpec::builder()
+        let partition_spec = PartitionSpec::builder(&schema)
             .with_spec_id(0)
-            .with_partition_field(PartitionField {
-                name: "vendor_id".to_string(),
-                transform: Transform::Identity,
-                source_id: 1,
-                field_id: 1000,
-            })
+            .add_partition_field("vendor_id", "vendor_id", Transform::Identity)
+            .unwrap()
             .build()
             .unwrap();
 
@@ -1292,14 +1287,15 @@ mod tests {
             .build()
             .unwrap();
 
-        let partition_spec = PartitionSpec::builder()
+        let partition_spec = PartitionSpec::builder(&schema1)
             .with_spec_id(0)
-            .with_partition_field(PartitionField {
+            .add_unbound_field(UnboundPartitionField {
                 name: "x".to_string(),
                 transform: Transform::Identity,
                 source_id: 1,
-                field_id: 1000,
+                partition_id: Some(1000),
             })
+            .unwrap()
             .build()
             .unwrap();
 
@@ -1414,14 +1410,15 @@ mod tests {
             .build()
             .unwrap();
 
-        let partition_spec = PartitionSpec::builder()
+        let partition_spec = PartitionSpec::builder(&schema)
             .with_spec_id(0)
-            .with_partition_field(PartitionField {
+            .add_unbound_field(UnboundPartitionField {
                 name: "x".to_string(),
                 transform: Transform::Identity,
                 source_id: 1,
-                field_id: 1000,
+                partition_id: Some(1000),
             })
+            .unwrap()
             .build()
             .unwrap();
 
@@ -1493,14 +1490,15 @@ mod tests {
             .build()
             .unwrap();
 
-        let partition_spec = PartitionSpec::builder()
+        let partition_spec = PartitionSpec::builder(&schema)
             .with_spec_id(0)
-            .with_partition_field(PartitionField {
+            .add_unbound_field(UnboundPartitionField {
                 name: "x".to_string(),
                 transform: Transform::Identity,
                 source_id: 1,
-                field_id: 1000,
+                partition_id: Some(1000),
             })
+            .unwrap()
             .build()
             .unwrap();
 
@@ -1686,10 +1684,12 @@ mod tests {
             table_metadata.partition_specs,
             HashMap::from([(
                 0,
-                Arc::new(PartitionSpec {
-                    spec_id: 0,
-                    fields: vec![]
-                })
+                Arc::new(
+                    
PartitionSpec::builder(table_metadata.schemas.get(&0).unwrap())
+                        .with_spec_id(0)
+                        .build()
+                        .unwrap()
+                )
             )])
         );
         assert_eq!(

Reply via email to