This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 6e0bcf56 feat: Safer PartitionSpec & SchemalessPartitionSpec (#645)
6e0bcf56 is described below

commit 6e0bcf56028e0d20d5ceeedf87dbb3db7c929ee3
Author: Christian <[email protected]>
AuthorDate: Sat Nov 2 21:10:03 2024 -0700

    feat: Safer PartitionSpec & SchemalessPartitionSpec (#645)
    
    * SchemalessPartitionSpec
    
    * Traits -> Enum
    
    * Remove PartitionSpec enum
    
    * Address comments
---
 crates/catalog/memory/src/catalog.rs               |   6 +-
 crates/catalog/sql/src/catalog.rs                  |   7 +-
 .../src/expr/visitors/expression_evaluator.rs      | 136 +++---
 .../expr/visitors/inclusive_metrics_evaluator.rs   |  13 +-
 .../src/expr/visitors/inclusive_projection.rs      | 155 +++++--
 crates/iceberg/src/io/object_cache.rs              |   2 +-
 crates/iceberg/src/scan.rs                         |   8 +-
 crates/iceberg/src/spec/manifest.rs                | 516 ++++++++++-----------
 crates/iceberg/src/spec/partition.rs               | 433 ++++++++++++-----
 crates/iceberg/src/spec/table_metadata.rs          | 315 ++++++++-----
 .../src/writer/file_writer/location_generator.rs   |   5 +-
 11 files changed, 969 insertions(+), 627 deletions(-)

diff --git a/crates/catalog/memory/src/catalog.rs 
b/crates/catalog/memory/src/catalog.rs
index 1da04482..e4192aae 100644
--- a/crates/catalog/memory/src/catalog.rs
+++ b/crates/catalog/memory/src/catalog.rs
@@ -283,7 +283,7 @@ mod tests {
     use std::iter::FromIterator;
 
     use iceberg::io::FileIOBuilder;
-    use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, 
SortOrder, Type};
+    use iceberg::spec::{BoundPartitionSpec, NestedField, PrimitiveType, 
Schema, SortOrder, Type};
     use regex::Regex;
     use tempfile::TempDir;
 
@@ -355,7 +355,7 @@ mod tests {
 
         assert_eq!(metadata.current_schema().as_ref(), expected_schema);
 
-        let expected_partition_spec = PartitionSpec::builder(expected_schema)
+        let expected_partition_spec = 
BoundPartitionSpec::builder((*expected_schema).clone())
             .with_spec_id(0)
             .build()
             .unwrap();
@@ -365,7 +365,7 @@ mod tests {
                 .partition_specs_iter()
                 .map(|p| p.as_ref())
                 .collect_vec(),
-            vec![&expected_partition_spec]
+            vec![&expected_partition_spec.into_schemaless()]
         );
 
         let expected_sorted_order = SortOrder::builder()
diff --git a/crates/catalog/sql/src/catalog.rs 
b/crates/catalog/sql/src/catalog.rs
index b7976d9d..abf22ffd 100644
--- a/crates/catalog/sql/src/catalog.rs
+++ b/crates/catalog/sql/src/catalog.rs
@@ -781,7 +781,7 @@ mod tests {
     use std::hash::Hash;
 
     use iceberg::io::FileIOBuilder;
-    use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, 
SortOrder, Type};
+    use iceberg::spec::{BoundPartitionSpec, NestedField, PrimitiveType, 
Schema, SortOrder, Type};
     use iceberg::table::Table;
     use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation, 
TableIdent};
     use itertools::Itertools;
@@ -874,10 +874,11 @@ mod tests {
 
         assert_eq!(metadata.current_schema().as_ref(), expected_schema);
 
-        let expected_partition_spec = PartitionSpec::builder(expected_schema)
+        let expected_partition_spec = 
BoundPartitionSpec::builder(expected_schema.clone())
             .with_spec_id(0)
             .build()
-            .unwrap();
+            .unwrap()
+            .into_schemaless();
 
         assert_eq!(
             metadata
diff --git a/crates/iceberg/src/expr/visitors/expression_evaluator.rs 
b/crates/iceberg/src/expr/visitors/expression_evaluator.rs
index 8f3c2971..2add5761 100644
--- a/crates/iceberg/src/expr/visitors/expression_evaluator.rs
+++ b/crates/iceberg/src/expr/visitors/expression_evaluator.rs
@@ -258,15 +258,13 @@ mod tests {
         UnaryExpression,
     };
     use crate::spec::{
-        DataContentType, DataFile, DataFileFormat, Datum, Literal, 
NestedField, PartitionSpec,
-        PartitionSpecRef, PrimitiveType, Schema, SchemaRef, Struct, Transform, 
Type,
+        BoundPartitionSpec, BoundPartitionSpecRef, DataContentType, DataFile, 
DataFileFormat,
+        Datum, Literal, NestedField, PrimitiveType, Schema, Struct, Transform, 
Type,
         UnboundPartitionField,
     };
     use crate::Result;
 
-    fn create_schema_and_partition_spec(
-        r#type: PrimitiveType,
-    ) -> Result<(SchemaRef, PartitionSpecRef)> {
+    fn create_partition_spec(r#type: PrimitiveType) -> 
Result<BoundPartitionSpecRef> {
         let schema = Schema::builder()
             .with_fields(vec![Arc::new(NestedField::optional(
                 1,
@@ -275,7 +273,7 @@ mod tests {
             ))])
             .build()?;
 
-        let spec = PartitionSpec::builder(&schema)
+        let spec = BoundPartitionSpec::builder(schema.clone())
             .with_spec_id(1)
             .add_unbound_fields(vec![UnboundPartitionField::builder()
                 .source_id(1)
@@ -287,16 +285,15 @@ mod tests {
             .build()
             .unwrap();
 
-        Ok((Arc::new(schema), Arc::new(spec)))
+        Ok(Arc::new(spec))
     }
 
     fn create_partition_filter(
-        schema: &Schema,
-        partition_spec: PartitionSpecRef,
+        partition_spec: BoundPartitionSpecRef,
         predicate: &BoundPredicate,
         case_sensitive: bool,
     ) -> Result<BoundPredicate> {
-        let partition_type = partition_spec.partition_type(schema)?;
+        let partition_type = partition_spec.partition_type();
         let partition_fields = partition_type.fields().to_owned();
 
         let partition_schema = Schema::builder()
@@ -304,7 +301,8 @@ mod tests {
             .with_fields(partition_fields)
             .build()?;
 
-        let mut inclusive_projection = 
InclusiveProjection::new(partition_spec);
+        let mut inclusive_projection =
+            
InclusiveProjection::new((*partition_spec).clone().into_schemaless().into());
 
         let partition_filter = inclusive_projection
             .project(predicate)?
@@ -315,13 +313,11 @@ mod tests {
     }
 
     fn create_expression_evaluator(
-        schema: &Schema,
-        partition_spec: PartitionSpecRef,
+        partition_spec: BoundPartitionSpecRef,
         predicate: &BoundPredicate,
         case_sensitive: bool,
     ) -> Result<ExpressionEvaluator> {
-        let partition_filter =
-            create_partition_filter(schema, partition_spec, predicate, 
case_sensitive)?;
+        let partition_filter = create_partition_filter(partition_spec, 
predicate, case_sensitive)?;
 
         Ok(ExpressionEvaluator::new(partition_filter))
     }
@@ -375,7 +371,7 @@ mod tests {
     #[test]
     fn test_expr_or() -> Result<()> {
         let case_sensitive = true;
-        let (schema, partition_spec) = 
create_schema_and_partition_spec(PrimitiveType::Float)?;
+        let partition_spec = create_partition_spec(PrimitiveType::Float)?;
 
         let predicate = Predicate::Binary(BinaryExpression::new(
             PredicateOperator::LessThan,
@@ -387,10 +383,10 @@ mod tests {
             Reference::new("a"),
             Datum::float(0.4),
         )))
-        .bind(schema.clone(), case_sensitive)?;
+        .bind(partition_spec.schema_ref().clone(), case_sensitive)?;
 
         let expression_evaluator =
-            create_expression_evaluator(&schema, partition_spec, &predicate, 
case_sensitive)?;
+            create_expression_evaluator(partition_spec, &predicate, 
case_sensitive)?;
 
         let data_file = create_data_file_float();
 
@@ -404,7 +400,7 @@ mod tests {
     #[test]
     fn test_expr_and() -> Result<()> {
         let case_sensitive = true;
-        let (schema, partition_spec) = 
create_schema_and_partition_spec(PrimitiveType::Float)?;
+        let partition_spec = create_partition_spec(PrimitiveType::Float)?;
 
         let predicate = Predicate::Binary(BinaryExpression::new(
             PredicateOperator::LessThan,
@@ -416,10 +412,10 @@ mod tests {
             Reference::new("a"),
             Datum::float(0.4),
         )))
-        .bind(schema.clone(), case_sensitive)?;
+        .bind(partition_spec.schema_ref().clone(), case_sensitive)?;
 
         let expression_evaluator =
-            create_expression_evaluator(&schema, partition_spec, &predicate, 
case_sensitive)?;
+            create_expression_evaluator(partition_spec, &predicate, 
case_sensitive)?;
 
         let data_file = create_data_file_float();
 
@@ -433,17 +429,17 @@ mod tests {
     #[test]
     fn test_expr_not_in() -> Result<()> {
         let case_sensitive = true;
-        let (schema, partition_spec) = 
create_schema_and_partition_spec(PrimitiveType::Float)?;
+        let partition_spec = create_partition_spec(PrimitiveType::Float)?;
 
         let predicate = Predicate::Set(SetExpression::new(
             PredicateOperator::NotIn,
             Reference::new("a"),
             FnvHashSet::from_iter([Datum::float(0.9), Datum::float(1.2), 
Datum::float(2.4)]),
         ))
-        .bind(schema.clone(), case_sensitive)?;
+        .bind(partition_spec.schema_ref().clone(), case_sensitive)?;
 
         let expression_evaluator =
-            create_expression_evaluator(&schema, partition_spec, &predicate, 
case_sensitive)?;
+            create_expression_evaluator(partition_spec, &predicate, 
case_sensitive)?;
 
         let data_file = create_data_file_float();
 
@@ -457,17 +453,17 @@ mod tests {
     #[test]
     fn test_expr_in() -> Result<()> {
         let case_sensitive = true;
-        let (schema, partition_spec) = 
create_schema_and_partition_spec(PrimitiveType::Float)?;
+        let partition_spec = create_partition_spec(PrimitiveType::Float)?;
 
         let predicate = Predicate::Set(SetExpression::new(
             PredicateOperator::In,
             Reference::new("a"),
             FnvHashSet::from_iter([Datum::float(1.0), Datum::float(1.2), 
Datum::float(2.4)]),
         ))
-        .bind(schema.clone(), case_sensitive)?;
+        .bind(partition_spec.schema_ref().clone(), case_sensitive)?;
 
         let expression_evaluator =
-            create_expression_evaluator(&schema, partition_spec, &predicate, 
case_sensitive)?;
+            create_expression_evaluator(partition_spec, &predicate, 
case_sensitive)?;
 
         let data_file = create_data_file_float();
 
@@ -481,17 +477,17 @@ mod tests {
     #[test]
     fn test_expr_not_starts_with() -> Result<()> {
         let case_sensitive = true;
-        let (schema, partition_spec) = 
create_schema_and_partition_spec(PrimitiveType::String)?;
+        let partition_spec = create_partition_spec(PrimitiveType::String)?;
 
         let predicate = Predicate::Binary(BinaryExpression::new(
             PredicateOperator::NotStartsWith,
             Reference::new("a"),
             Datum::string("not"),
         ))
-        .bind(schema.clone(), case_sensitive)?;
+        .bind(partition_spec.schema_ref().clone(), case_sensitive)?;
 
         let expression_evaluator =
-            create_expression_evaluator(&schema, partition_spec, &predicate, 
case_sensitive)?;
+            create_expression_evaluator(partition_spec, &predicate, 
case_sensitive)?;
 
         let data_file = create_data_file_string();
 
@@ -505,17 +501,17 @@ mod tests {
     #[test]
     fn test_expr_starts_with() -> Result<()> {
         let case_sensitive = true;
-        let (schema, partition_spec) = 
create_schema_and_partition_spec(PrimitiveType::String)?;
+        let partition_spec = create_partition_spec(PrimitiveType::String)?;
 
         let predicate = Predicate::Binary(BinaryExpression::new(
             PredicateOperator::StartsWith,
             Reference::new("a"),
             Datum::string("test"),
         ))
-        .bind(schema.clone(), case_sensitive)?;
+        .bind(partition_spec.schema_ref().clone(), case_sensitive)?;
 
         let expression_evaluator =
-            create_expression_evaluator(&schema, partition_spec, &predicate, 
case_sensitive)?;
+            create_expression_evaluator(partition_spec, &predicate, 
case_sensitive)?;
 
         let data_file = create_data_file_string();
 
@@ -529,17 +525,17 @@ mod tests {
     #[test]
     fn test_expr_not_eq() -> Result<()> {
         let case_sensitive = true;
-        let (schema, partition_spec) = 
create_schema_and_partition_spec(PrimitiveType::Float)?;
+        let partition_spec = create_partition_spec(PrimitiveType::Float)?;
 
         let predicate = Predicate::Binary(BinaryExpression::new(
             PredicateOperator::NotEq,
             Reference::new("a"),
             Datum::float(0.9),
         ))
-        .bind(schema.clone(), case_sensitive)?;
+        .bind(partition_spec.schema_ref().clone(), case_sensitive)?;
 
         let expression_evaluator =
-            create_expression_evaluator(&schema, partition_spec, &predicate, 
case_sensitive)?;
+            create_expression_evaluator(partition_spec, &predicate, 
case_sensitive)?;
 
         let data_file = create_data_file_float();
 
@@ -553,17 +549,17 @@ mod tests {
     #[test]
     fn test_expr_eq() -> Result<()> {
         let case_sensitive = true;
-        let (schema, partition_spec) = 
create_schema_and_partition_spec(PrimitiveType::Float)?;
+        let partition_spec = create_partition_spec(PrimitiveType::Float)?;
 
         let predicate = Predicate::Binary(BinaryExpression::new(
             PredicateOperator::Eq,
             Reference::new("a"),
             Datum::float(1.0),
         ))
-        .bind(schema.clone(), case_sensitive)?;
+        .bind(partition_spec.schema_ref().clone(), case_sensitive)?;
 
         let expression_evaluator =
-            create_expression_evaluator(&schema, partition_spec, &predicate, 
case_sensitive)?;
+            create_expression_evaluator(partition_spec, &predicate, 
case_sensitive)?;
 
         let data_file = create_data_file_float();
 
@@ -577,17 +573,17 @@ mod tests {
     #[test]
     fn test_expr_greater_than_or_eq() -> Result<()> {
         let case_sensitive = true;
-        let (schema, partition_spec) = 
create_schema_and_partition_spec(PrimitiveType::Float)?;
+        let partition_spec = create_partition_spec(PrimitiveType::Float)?;
 
         let predicate = Predicate::Binary(BinaryExpression::new(
             PredicateOperator::GreaterThanOrEq,
             Reference::new("a"),
             Datum::float(1.0),
         ))
-        .bind(schema.clone(), case_sensitive)?;
+        .bind(partition_spec.schema_ref().clone(), case_sensitive)?;
 
         let expression_evaluator =
-            create_expression_evaluator(&schema, partition_spec, &predicate, 
case_sensitive)?;
+            create_expression_evaluator(partition_spec, &predicate, 
case_sensitive)?;
 
         let data_file = create_data_file_float();
 
@@ -601,17 +597,17 @@ mod tests {
     #[test]
     fn test_expr_greater_than() -> Result<()> {
         let case_sensitive = true;
-        let (schema, partition_spec) = 
create_schema_and_partition_spec(PrimitiveType::Float)?;
+        let partition_spec = create_partition_spec(PrimitiveType::Float)?;
 
         let predicate = Predicate::Binary(BinaryExpression::new(
             PredicateOperator::GreaterThan,
             Reference::new("a"),
             Datum::float(0.9),
         ))
-        .bind(schema.clone(), case_sensitive)?;
+        .bind(partition_spec.schema_ref().clone(), case_sensitive)?;
 
         let expression_evaluator =
-            create_expression_evaluator(&schema, partition_spec, &predicate, 
case_sensitive)?;
+            create_expression_evaluator(partition_spec, &predicate, 
case_sensitive)?;
 
         let data_file = create_data_file_float();
 
@@ -625,17 +621,17 @@ mod tests {
     #[test]
     fn test_expr_less_than_or_eq() -> Result<()> {
         let case_sensitive = true;
-        let (schema, partition_spec) = 
create_schema_and_partition_spec(PrimitiveType::Float)?;
+        let partition_spec = create_partition_spec(PrimitiveType::Float)?;
 
         let predicate = Predicate::Binary(BinaryExpression::new(
             PredicateOperator::LessThanOrEq,
             Reference::new("a"),
             Datum::float(1.0),
         ))
-        .bind(schema.clone(), case_sensitive)?;
+        .bind(partition_spec.schema_ref().clone(), case_sensitive)?;
 
         let expression_evaluator =
-            create_expression_evaluator(&schema, partition_spec, &predicate, 
case_sensitive)?;
+            create_expression_evaluator(partition_spec, &predicate, 
case_sensitive)?;
 
         let data_file = create_data_file_float();
 
@@ -649,17 +645,17 @@ mod tests {
     #[test]
     fn test_expr_less_than() -> Result<()> {
         let case_sensitive = true;
-        let (schema, partition_spec) = 
create_schema_and_partition_spec(PrimitiveType::Float)?;
+        let partition_spec = create_partition_spec(PrimitiveType::Float)?;
 
         let predicate = Predicate::Binary(BinaryExpression::new(
             PredicateOperator::LessThan,
             Reference::new("a"),
             Datum::float(1.1),
         ))
-        .bind(schema.clone(), case_sensitive)?;
+        .bind(partition_spec.schema_ref().clone(), case_sensitive)?;
 
         let expression_evaluator =
-            create_expression_evaluator(&schema, partition_spec, &predicate, 
case_sensitive)?;
+            create_expression_evaluator(partition_spec, &predicate, 
case_sensitive)?;
 
         let data_file = create_data_file_float();
 
@@ -673,15 +669,15 @@ mod tests {
     #[test]
     fn test_expr_is_not_nan() -> Result<()> {
         let case_sensitive = true;
-        let (schema, partition_spec) = 
create_schema_and_partition_spec(PrimitiveType::Float)?;
+        let partition_spec = create_partition_spec(PrimitiveType::Float)?;
         let predicate = Predicate::Unary(UnaryExpression::new(
             PredicateOperator::NotNan,
             Reference::new("a"),
         ))
-        .bind(schema.clone(), case_sensitive)?;
+        .bind(partition_spec.schema_ref().clone(), case_sensitive)?;
 
         let expression_evaluator =
-            create_expression_evaluator(&schema, partition_spec, &predicate, 
case_sensitive)?;
+            create_expression_evaluator(partition_spec, &predicate, 
case_sensitive)?;
 
         let data_file = create_data_file_float();
 
@@ -695,15 +691,15 @@ mod tests {
     #[test]
     fn test_expr_is_nan() -> Result<()> {
         let case_sensitive = true;
-        let (schema, partition_spec) = 
create_schema_and_partition_spec(PrimitiveType::Float)?;
+        let partition_spec = create_partition_spec(PrimitiveType::Float)?;
         let predicate = Predicate::Unary(UnaryExpression::new(
             PredicateOperator::IsNan,
             Reference::new("a"),
         ))
-        .bind(schema.clone(), case_sensitive)?;
+        .bind(partition_spec.schema_ref().clone(), case_sensitive)?;
 
         let expression_evaluator =
-            create_expression_evaluator(&schema, partition_spec, &predicate, 
case_sensitive)?;
+            create_expression_evaluator(partition_spec, &predicate, 
case_sensitive)?;
 
         let data_file = create_data_file_float();
 
@@ -717,15 +713,15 @@ mod tests {
     #[test]
     fn test_expr_is_not_null() -> Result<()> {
         let case_sensitive = true;
-        let (schema, partition_spec) = 
create_schema_and_partition_spec(PrimitiveType::Float)?;
+        let partition_spec = create_partition_spec(PrimitiveType::Float)?;
         let predicate = Predicate::Unary(UnaryExpression::new(
             PredicateOperator::NotNull,
             Reference::new("a"),
         ))
-        .bind(schema.clone(), case_sensitive)?;
+        .bind(partition_spec.schema_ref().clone(), case_sensitive)?;
 
         let expression_evaluator =
-            create_expression_evaluator(&schema, partition_spec, &predicate, 
case_sensitive)?;
+            create_expression_evaluator(partition_spec, &predicate, 
case_sensitive)?;
 
         let data_file = create_data_file_float();
 
@@ -739,15 +735,15 @@ mod tests {
     #[test]
     fn test_expr_is_null() -> Result<()> {
         let case_sensitive = true;
-        let (schema, partition_spec) = 
create_schema_and_partition_spec(PrimitiveType::Float)?;
+        let partition_spec = create_partition_spec(PrimitiveType::Float)?;
         let predicate = Predicate::Unary(UnaryExpression::new(
             PredicateOperator::IsNull,
             Reference::new("a"),
         ))
-        .bind(schema.clone(), case_sensitive)?;
+        .bind(partition_spec.schema_ref().clone(), case_sensitive)?;
 
         let expression_evaluator =
-            create_expression_evaluator(&schema, partition_spec, &predicate, 
case_sensitive)?;
+            create_expression_evaluator(partition_spec, &predicate, 
case_sensitive)?;
 
         let data_file = create_data_file_float();
 
@@ -761,11 +757,12 @@ mod tests {
     #[test]
     fn test_expr_always_false() -> Result<()> {
         let case_sensitive = true;
-        let (schema, partition_spec) = 
create_schema_and_partition_spec(PrimitiveType::Float)?;
-        let predicate = Predicate::AlwaysFalse.bind(schema.clone(), 
case_sensitive)?;
+        let partition_spec = create_partition_spec(PrimitiveType::Float)?;
+        let predicate =
+            Predicate::AlwaysFalse.bind(partition_spec.schema_ref().clone(), 
case_sensitive)?;
 
         let expression_evaluator =
-            create_expression_evaluator(&schema, partition_spec, &predicate, 
case_sensitive)?;
+            create_expression_evaluator(partition_spec, &predicate, 
case_sensitive)?;
 
         let data_file = create_data_file_float();
 
@@ -779,11 +776,12 @@ mod tests {
     #[test]
     fn test_expr_always_true() -> Result<()> {
         let case_sensitive = true;
-        let (schema, partition_spec) = 
create_schema_and_partition_spec(PrimitiveType::Float)?;
-        let predicate = Predicate::AlwaysTrue.bind(schema.clone(), 
case_sensitive)?;
+        let partition_spec = create_partition_spec(PrimitiveType::Float)?;
+        let predicate =
+            Predicate::AlwaysTrue.bind(partition_spec.schema_ref().clone(), 
case_sensitive)?;
 
         let expression_evaluator =
-            create_expression_evaluator(&schema, partition_spec, &predicate, 
case_sensitive)?;
+            create_expression_evaluator(partition_spec, &predicate, 
case_sensitive)?;
 
         let data_file = create_data_file_float();
 
diff --git a/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs 
b/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs
index a2ee4722..1cdc7577 100644
--- a/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs
+++ b/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs
@@ -495,7 +495,7 @@ mod test {
         UnaryExpression,
     };
     use crate::spec::{
-        DataContentType, DataFile, DataFileFormat, Datum, NestedField, 
PartitionSpec,
+        BoundPartitionSpec, DataContentType, DataFile, DataFileFormat, Datum, 
NestedField,
         PrimitiveType, Schema, Struct, Transform, Type, UnboundPartitionField,
     };
 
@@ -504,10 +504,10 @@ mod test {
 
     #[test]
     fn test_data_file_no_partitions() {
-        let (table_schema_ref, _partition_spec_ref) = 
create_test_schema_and_partition_spec();
+        let partition_spec_ref = create_test_partition_spec();
 
         let partition_filter = Predicate::AlwaysTrue
-            .bind(table_schema_ref.clone(), false)
+            .bind(partition_spec_ref.schema_ref().clone(), false)
             .unwrap();
 
         let case_sensitive = false;
@@ -1645,7 +1645,7 @@ mod test {
         assert!(result, "Should read: NotIn on no nulls column");
     }
 
-    fn create_test_schema_and_partition_spec() -> (Arc<Schema>, 
Arc<PartitionSpec>) {
+    fn create_test_partition_spec() -> Arc<BoundPartitionSpec> {
         let table_schema = Schema::builder()
             .with_fields(vec![Arc::new(NestedField::optional(
                 1,
@@ -1656,7 +1656,7 @@ mod test {
             .unwrap();
         let table_schema_ref = Arc::new(table_schema);
 
-        let partition_spec = PartitionSpec::builder(&table_schema_ref)
+        let partition_spec = 
BoundPartitionSpec::builder(table_schema_ref.clone())
             .with_spec_id(1)
             .add_unbound_fields(vec![UnboundPartitionField::builder()
                 .source_id(1)
@@ -1667,8 +1667,7 @@ mod test {
             .unwrap()
             .build()
             .unwrap();
-        let partition_spec_ref = Arc::new(partition_spec);
-        (table_schema_ref, partition_spec_ref)
+        Arc::new(partition_spec)
     }
 
     fn not_null(reference: &str) -> BoundPredicate {
diff --git a/crates/iceberg/src/expr/visitors/inclusive_projection.rs 
b/crates/iceberg/src/expr/visitors/inclusive_projection.rs
index 2087207e..7c6e0b2d 100644
--- a/crates/iceberg/src/expr/visitors/inclusive_projection.rs
+++ b/crates/iceberg/src/expr/visitors/inclusive_projection.rs
@@ -21,16 +21,16 @@ use fnv::FnvHashSet;
 
 use crate::expr::visitors::bound_predicate_visitor::{visit, 
BoundPredicateVisitor};
 use crate::expr::{BoundPredicate, BoundReference, Predicate};
-use crate::spec::{Datum, PartitionField, PartitionSpecRef};
+use crate::spec::{Datum, PartitionField, SchemalessPartitionSpecRef};
 use crate::Error;
 
 pub(crate) struct InclusiveProjection {
-    partition_spec: PartitionSpecRef,
+    partition_spec: SchemalessPartitionSpecRef,
     cached_parts: HashMap<i32, Vec<PartitionField>>,
 }
 
 impl InclusiveProjection {
-    pub(crate) fn new(partition_spec: PartitionSpecRef) -> Self {
+    pub(crate) fn new(partition_spec: SchemalessPartitionSpecRef) -> Self {
         Self {
             partition_spec,
             cached_parts: HashMap::new(),
@@ -235,7 +235,7 @@ mod tests {
     use crate::expr::visitors::inclusive_projection::InclusiveProjection;
     use crate::expr::{Bind, Predicate, Reference};
     use crate::spec::{
-        Datum, NestedField, PartitionField, PartitionSpec, PrimitiveType, 
Schema, Transform, Type,
+        BoundPartitionSpec, Datum, NestedField, PrimitiveType, Schema, 
Transform, Type,
         UnboundPartitionField,
     };
 
@@ -265,13 +265,14 @@ mod tests {
     #[test]
     fn test_inclusive_projection_logic_ops() {
         let schema = build_test_schema();
+        let arc_schema = Arc::new(schema);
 
-        let partition_spec = PartitionSpec::builder(&schema)
+        let partition_spec = BoundPartitionSpec::builder(arc_schema.clone())
             .with_spec_id(1)
             .build()
-            .unwrap();
+            .unwrap()
+            .into_schemaless();
 
-        let arc_schema = Arc::new(schema);
         let arc_partition_spec = Arc::new(partition_spec);
 
         // this predicate contains only logic operators,
@@ -295,8 +296,9 @@ mod tests {
     #[test]
     fn test_inclusive_projection_identity_transform() {
         let schema = build_test_schema();
+        let arc_schema = Arc::new(schema);
 
-        let partition_spec = PartitionSpec::builder(&schema)
+        let partition_spec = BoundPartitionSpec::builder(arc_schema.clone())
             .with_spec_id(1)
             .add_unbound_field(
                 UnboundPartitionField::builder()
@@ -308,9 +310,9 @@ mod tests {
             )
             .unwrap()
             .build()
-            .unwrap();
+            .unwrap()
+            .into_schemaless();
 
-        let arc_schema = Arc::new(schema);
         let arc_partition_spec = Arc::new(partition_spec);
 
         let unbound_predicate = Reference::new("a").less_than(Datum::int(10));
@@ -321,7 +323,7 @@ mod tests {
         // should result in the same Predicate as the original
         // `unbound_predicate`, since we have just a single partition field,
         // and it has an Identity transform
-        let mut inclusive_projection = 
InclusiveProjection::new(arc_partition_spec.clone());
+        let mut inclusive_projection = 
InclusiveProjection::new(arc_partition_spec);
         let result = inclusive_projection.project(&bound_predicate).unwrap();
 
         let expected = "a < 10".to_string();
@@ -330,34 +332,95 @@ mod tests {
     }
 
     #[test]
-    fn test_inclusive_projection_date_transforms() {
+    fn test_inclusive_projection_date_year_transform() {
         let schema = build_test_schema();
+        let arc_schema = Arc::new(schema);
+
+        let partition_spec = BoundPartitionSpec::builder(arc_schema.clone())
+            .with_spec_id(1)
+            .add_unbound_fields(vec![UnboundPartitionField {
+                source_id: 2,
+                name: "year".to_string(),
+                field_id: Some(1000),
+                transform: Transform::Year,
+            }])
+            .unwrap()
+            .build()
+            .unwrap()
+            .into_schemaless();
+
+        let arc_partition_spec = Arc::new(partition_spec);
+
+        let unbound_predicate =
+            
Reference::new("date").less_than(Datum::date_from_str("2024-01-01").unwrap());
+
+        let bound_predicate = unbound_predicate.bind(arc_schema.clone(), 
false).unwrap();
+
+        // applying InclusiveProjection to bound_predicate
+        // should result in a predicate that correctly handles
+        // year, month and date
+        let mut inclusive_projection = 
InclusiveProjection::new(arc_partition_spec);
+        let result = inclusive_projection.project(&bound_predicate).unwrap();
+
+        let expected = "year <= 53".to_string();
+
+        assert_eq!(result.to_string(), expected);
+    }
+
+    #[test]
+    fn test_inclusive_projection_date_month_transform() {
+        let schema = build_test_schema();
+        let arc_schema = Arc::new(schema);
+
+        let partition_spec = BoundPartitionSpec::builder(arc_schema.clone())
+            .with_spec_id(1)
+            .add_unbound_fields(vec![UnboundPartitionField {
+                source_id: 2,
+                name: "month".to_string(),
+                field_id: Some(1000),
+                transform: Transform::Month,
+            }])
+            .unwrap()
+            .build()
+            .unwrap()
+            .into_schemaless();
+
+        let arc_partition_spec = Arc::new(partition_spec);
+
+        let unbound_predicate =
+            
Reference::new("date").less_than(Datum::date_from_str("2024-01-01").unwrap());
+
+        let bound_predicate = unbound_predicate.bind(arc_schema.clone(), 
false).unwrap();
 
-        let partition_spec = PartitionSpec {
-            spec_id: 1,
-            fields: vec![
-                PartitionField {
-                    source_id: 2,
-                    name: "year".to_string(),
-                    field_id: 1000,
-                    transform: Transform::Year,
-                },
-                PartitionField {
-                    source_id: 2,
-                    name: "month".to_string(),
-                    field_id: 1001,
-                    transform: Transform::Month,
-                },
-                PartitionField {
-                    source_id: 2,
-                    name: "day".to_string(),
-                    field_id: 1002,
-                    transform: Transform::Day,
-                },
-            ],
-        };
+        // applying InclusiveProjection to bound_predicate
+        // should result in a predicate that correctly handles
+        // year, month and date
+        let mut inclusive_projection = 
InclusiveProjection::new(arc_partition_spec);
+        let result = inclusive_projection.project(&bound_predicate).unwrap();
+
+        let expected = "month <= 647".to_string();
+
+        assert_eq!(result.to_string(), expected);
+    }
 
+    #[test]
+    fn test_inclusive_projection_date_day_transform() {
+        let schema = build_test_schema();
         let arc_schema = Arc::new(schema);
+
+        let partition_spec = BoundPartitionSpec::builder(arc_schema.clone())
+            .with_spec_id(1)
+            .add_unbound_fields(vec![UnboundPartitionField {
+                source_id: 2,
+                name: "day".to_string(),
+                field_id: Some(1000),
+                transform: Transform::Day,
+            }])
+            .unwrap()
+            .build()
+            .unwrap()
+            .into_schemaless();
+
         let arc_partition_spec = Arc::new(partition_spec);
 
         let unbound_predicate =
@@ -368,10 +431,10 @@ mod tests {
         // applying InclusiveProjection to bound_predicate
         // should result in a predicate that correctly handles
         // year, month and date
-        let mut inclusive_projection = 
InclusiveProjection::new(arc_partition_spec.clone());
+        let mut inclusive_projection = 
InclusiveProjection::new(arc_partition_spec);
         let result = inclusive_projection.project(&bound_predicate).unwrap();
 
-        let expected = "((year <= 53) AND (month <= 647)) AND (day <= 
19722)".to_string();
+        let expected = "day <= 19722".to_string();
 
         assert_eq!(result.to_string(), expected);
     }
@@ -379,8 +442,9 @@ mod tests {
     #[test]
     fn test_inclusive_projection_truncate_transform() {
         let schema = build_test_schema();
+        let arc_schema = Arc::new(schema);
 
-        let partition_spec = PartitionSpec::builder(&schema)
+        let partition_spec = BoundPartitionSpec::builder(arc_schema.clone())
             .with_spec_id(1)
             .add_unbound_field(
                 UnboundPartitionField::builder()
@@ -392,9 +456,9 @@ mod tests {
             )
             .unwrap()
             .build()
-            .unwrap();
+            .unwrap()
+            .into_schemaless();
 
-        let arc_schema = Arc::new(schema);
         let arc_partition_spec = Arc::new(partition_spec);
 
         let unbound_predicate = 
Reference::new("name").starts_with(Datum::string("Testy McTest"));
@@ -408,7 +472,7 @@ mod tests {
         // name that start with "Testy McTest" into a partition
         // for values of name that start with the first four letters
         // of that, ie "Test".
-        let mut inclusive_projection = 
InclusiveProjection::new(arc_partition_spec.clone());
+        let mut inclusive_projection = 
InclusiveProjection::new(arc_partition_spec);
         let result = inclusive_projection.project(&bound_predicate).unwrap();
 
         let expected = "name_truncate STARTS WITH \"Test\"".to_string();
@@ -419,8 +483,9 @@ mod tests {
     #[test]
     fn test_inclusive_projection_bucket_transform() {
         let schema = build_test_schema();
+        let arc_schema = Arc::new(schema);
 
-        let partition_spec = PartitionSpec::builder(&schema)
+        let partition_spec = BoundPartitionSpec::builder(arc_schema.clone())
             .with_spec_id(1)
             .add_unbound_field(
                 UnboundPartitionField::builder()
@@ -432,9 +497,9 @@ mod tests {
             )
             .unwrap()
             .build()
-            .unwrap();
+            .unwrap()
+            .into_schemaless();
 
-        let arc_schema = Arc::new(schema);
         let arc_partition_spec = Arc::new(partition_spec);
 
         let unbound_predicate = Reference::new("a").equal_to(Datum::int(10));
@@ -445,7 +510,7 @@ mod tests {
         // should result in the "a = 10" predicate being
         // transformed into "a = 2", since 10 gets bucketed
         // to 2 with a Bucket(7) partition
-        let mut inclusive_projection = 
InclusiveProjection::new(arc_partition_spec.clone());
+        let mut inclusive_projection = 
InclusiveProjection::new(arc_partition_spec);
         let result = inclusive_projection.project(&bound_predicate).unwrap();
 
         let expected = "a_bucket[7] = 2".to_string();
diff --git a/crates/iceberg/src/io/object_cache.rs 
b/crates/iceberg/src/io/object_cache.rs
index 731072a5..35b6a2c9 100644
--- a/crates/iceberg/src/io/object_cache.rs
+++ b/crates/iceberg/src/io/object_cache.rs
@@ -262,7 +262,7 @@ mod tests {
             )
             .write(Manifest::new(
                 ManifestMetadata::builder()
-                    .schema((*current_schema).clone())
+                    .schema(current_schema.clone())
                     .content(ManifestContentType::Data)
                     .format_version(FormatVersion::V2)
                     .partition_spec((**current_partition_spec).clone())
diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs
index d0355454..89e8846f 100644
--- a/crates/iceberg/src/scan.rs
+++ b/crates/iceberg/src/scan.rs
@@ -706,7 +706,7 @@ impl PartitionFilterCache {
         &self,
         spec_id: i32,
         table_metadata: &TableMetadataRef,
-        schema: &SchemaRef,
+        schema: &Schema,
         case_sensitive: bool,
         filter: BoundPredicate,
     ) -> Result<Arc<BoundPredicate>> {
@@ -732,11 +732,11 @@ impl PartitionFilterCache {
                 format!("Could not find partition spec for id {}", spec_id),
             ))?;
 
-        let partition_type = partition_spec.partition_type(schema.as_ref())?;
+        let partition_type = partition_spec.partition_type(schema)?;
         let partition_fields = partition_type.fields().to_owned();
         let partition_schema = Arc::new(
             Schema::builder()
-                .with_schema_id(partition_spec.spec_id)
+                .with_schema_id(partition_spec.spec_id())
                 .with_fields(partition_fields)
                 .build()?,
         );
@@ -1057,7 +1057,7 @@ mod tests {
             )
             .write(Manifest::new(
                 ManifestMetadata::builder()
-                    .schema((*current_schema).clone())
+                    .schema(current_schema.clone())
                     .content(ManifestContentType::Data)
                     .format_version(FormatVersion::V2)
                     .partition_spec((**current_partition_spec).clone())
diff --git a/crates/iceberg/src/spec/manifest.rs 
b/crates/iceberg/src/spec/manifest.rs
index 997908d5..085200b7 100644
--- a/crates/iceberg/src/spec/manifest.rs
+++ b/crates/iceberg/src/spec/manifest.rs
@@ -30,8 +30,8 @@ use typed_builder::TypedBuilder;
 
 use self::_const_schema::{manifest_schema_v1, manifest_schema_v2};
 use super::{
-    Datum, FieldSummary, FormatVersion, ManifestContentType, ManifestFile, 
PartitionSpec, Schema,
-    SchemaId, Struct, INITIAL_SEQUENCE_NUMBER, UNASSIGNED_SEQUENCE_NUMBER,
+    BoundPartitionSpec, Datum, FieldSummary, FormatVersion, 
ManifestContentType, ManifestFile,
+    Schema, SchemaId, SchemaRef, Struct, INITIAL_SEQUENCE_NUMBER, 
UNASSIGNED_SEQUENCE_NUMBER,
 };
 use crate::error::Result;
 use crate::io::OutputFile;
@@ -55,7 +55,7 @@ impl Manifest {
         let metadata = ManifestMetadata::parse(meta)?;
 
         // Parse manifest entries
-        let partition_type = 
metadata.partition_spec.partition_type(&metadata.schema)?;
+        let partition_type = metadata.partition_spec.partition_type();
 
         let entries = match metadata.format_version {
             FormatVersion::V1 => {
@@ -65,7 +65,7 @@ impl Manifest {
                     .into_iter()
                     .map(|value| {
                         from_value::<_serde::ManifestEntryV1>(&value?)?
-                            .try_into(&partition_type, &metadata.schema)
+                            .try_into(partition_type, &metadata.schema)
                     })
                     .collect::<Result<Vec<_>>>()?
             }
@@ -76,7 +76,7 @@ impl Manifest {
                     .into_iter()
                     .map(|value| {
                         from_value::<_serde::ManifestEntryV2>(&value?)?
-                            .try_into(&partition_type, &metadata.schema)
+                            .try_into(partition_type, &metadata.schema)
                     })
                     .collect::<Result<Vec<_>>>()?
             }
@@ -206,10 +206,7 @@ impl ManifestWriter {
     /// Write a manifest.
     pub async fn write(mut self, manifest: Manifest) -> Result<ManifestFile> {
         // Create the avro writer
-        let partition_type = manifest
-            .metadata
-            .partition_spec
-            .partition_type(&manifest.metadata.schema)?;
+        let partition_type = manifest.metadata.partition_spec.partition_type();
         let table_schema = &manifest.metadata.schema;
         let avro_schema = match manifest.metadata.format_version {
             FormatVersion::V1 => manifest_schema_v1(partition_type.clone())?,
@@ -284,12 +281,12 @@ impl ManifestWriter {
             let value = match manifest.metadata.format_version {
                 FormatVersion::V1 => 
to_value(_serde::ManifestEntryV1::try_from(
                     (*entry).clone(),
-                    &partition_type,
+                    partition_type,
                 )?)?
                 .resolve(&avro_schema)?,
                 FormatVersion::V2 => 
to_value(_serde::ManifestEntryV2::try_from(
                     (*entry).clone(),
-                    &partition_type,
+                    partition_type,
                 )?)?
                 .resolve(&avro_schema)?,
             };
@@ -705,11 +702,11 @@ mod _const_schema {
 pub struct ManifestMetadata {
     /// The table schema at the time the manifest
     /// was written
-    schema: Schema,
+    schema: SchemaRef,
     /// ID of the schema used to write the manifest as a string
     schema_id: SchemaId,
     /// The partition spec used  to write the manifest
-    partition_spec: PartitionSpec,
+    partition_spec: BoundPartitionSpec,
     /// Table format version number of the manifest as a string
     format_version: FormatVersion,
     /// Type of content files tracked by the manifest: “data” or “deletes”
@@ -719,7 +716,7 @@ pub struct ManifestMetadata {
 impl ManifestMetadata {
     /// Parse from metadata in avro file.
     pub fn parse(meta: &HashMap<String, Vec<u8>>) -> Result<Self> {
-        let schema = {
+        let schema = Arc::new({
             let bs = meta.get("schema").ok_or_else(|| {
                 Error::new(
                     ErrorKind::DataInvalid,
@@ -733,7 +730,7 @@ impl ManifestMetadata {
                 )
                 .with_source(err)
             })?
-        };
+        });
         let schema_id: i32 = meta
             .get("schema-id")
             .map(|bs| {
@@ -776,7 +773,10 @@ impl ManifestMetadata {
                 })
                 .transpose()?
                 .unwrap_or(0);
-            PartitionSpec { spec_id, fields }
+            BoundPartitionSpec::builder(schema.clone())
+                .with_spec_id(spec_id)
+                .add_unbound_fields(fields.into_iter().map(|f| 
f.into_unbound()))?
+                .build()?
         };
         let format_version = if let Some(bs) = meta.get("format-version") {
             serde_json::from_slice::<FormatVersion>(bs).map_err(|err| {
@@ -1519,82 +1519,82 @@ mod tests {
 
     #[tokio::test]
     async fn test_parse_manifest_v2_unpartition() {
+        let schema = Arc::new(
+            Schema::builder()
+                .with_fields(vec![
+                    // id v_int v_long v_float v_double v_varchar v_bool 
v_date v_timestamp v_decimal v_ts_ntz
+                    Arc::new(NestedField::optional(
+                        1,
+                        "id",
+                        Type::Primitive(PrimitiveType::Long),
+                    )),
+                    Arc::new(NestedField::optional(
+                        2,
+                        "v_int",
+                        Type::Primitive(PrimitiveType::Int),
+                    )),
+                    Arc::new(NestedField::optional(
+                        3,
+                        "v_long",
+                        Type::Primitive(PrimitiveType::Long),
+                    )),
+                    Arc::new(NestedField::optional(
+                        4,
+                        "v_float",
+                        Type::Primitive(PrimitiveType::Float),
+                    )),
+                    Arc::new(NestedField::optional(
+                        5,
+                        "v_double",
+                        Type::Primitive(PrimitiveType::Double),
+                    )),
+                    Arc::new(NestedField::optional(
+                        6,
+                        "v_varchar",
+                        Type::Primitive(PrimitiveType::String),
+                    )),
+                    Arc::new(NestedField::optional(
+                        7,
+                        "v_bool",
+                        Type::Primitive(PrimitiveType::Boolean),
+                    )),
+                    Arc::new(NestedField::optional(
+                        8,
+                        "v_date",
+                        Type::Primitive(PrimitiveType::Date),
+                    )),
+                    Arc::new(NestedField::optional(
+                        9,
+                        "v_timestamp",
+                        Type::Primitive(PrimitiveType::Timestamptz),
+                    )),
+                    Arc::new(NestedField::optional(
+                        10,
+                        "v_decimal",
+                        Type::Primitive(PrimitiveType::Decimal {
+                            precision: 36,
+                            scale: 10,
+                        }),
+                    )),
+                    Arc::new(NestedField::optional(
+                        11,
+                        "v_ts_ntz",
+                        Type::Primitive(PrimitiveType::Timestamp),
+                    )),
+                    Arc::new(NestedField::optional(
+                        12,
+                        "v_ts_ns_ntz",
+                        Type::Primitive(PrimitiveType::TimestampNs),
+                    )),
+                ])
+                .build()
+                .unwrap(),
+        );
         let manifest = Manifest {
             metadata: ManifestMetadata {
                 schema_id: 0,
-                schema: Schema::builder()
-                    .with_fields(vec![
-                        // id v_int v_long v_float v_double v_varchar v_bool 
v_date v_timestamp v_decimal v_ts_ntz
-                        Arc::new(NestedField::optional(
-                            1,
-                            "id",
-                            Type::Primitive(PrimitiveType::Long),
-                        )),
-                        Arc::new(NestedField::optional(
-                            2,
-                            "v_int",
-                            Type::Primitive(PrimitiveType::Int),
-                        )),
-                        Arc::new(NestedField::optional(
-                            3,
-                            "v_long",
-                            Type::Primitive(PrimitiveType::Long),
-                        )),
-                        Arc::new(NestedField::optional(
-                            4,
-                            "v_float",
-                            Type::Primitive(PrimitiveType::Float),
-                        )),
-                        Arc::new(NestedField::optional(
-                            5,
-                            "v_double",
-                            Type::Primitive(PrimitiveType::Double),
-                        )),
-                        Arc::new(NestedField::optional(
-                            6,
-                            "v_varchar",
-                            Type::Primitive(PrimitiveType::String),
-                        )),
-                        Arc::new(NestedField::optional(
-                            7,
-                            "v_bool",
-                            Type::Primitive(PrimitiveType::Boolean),
-                        )),
-                        Arc::new(NestedField::optional(
-                            8,
-                            "v_date",
-                            Type::Primitive(PrimitiveType::Date),
-                        )),
-                        Arc::new(NestedField::optional(
-                            9,
-                            "v_timestamp",
-                            Type::Primitive(PrimitiveType::Timestamptz),
-                        )),
-                        Arc::new(NestedField::optional(
-                            10,
-                            "v_decimal",
-                            Type::Primitive(PrimitiveType::Decimal {
-                                precision: 36,
-                                scale: 10,
-                            }),
-                        )),
-                        Arc::new(NestedField::optional(
-                            11,
-                            "v_ts_ntz",
-                            Type::Primitive(PrimitiveType::Timestamp),
-                        )),
-                        Arc::new(NestedField::optional(
-                            12,
-                            "v_ts_ns_ntz",
-                            Type::Primitive(PrimitiveType::TimestampNs
-                        ))),
-                    ])
-                    .build()
-                    .unwrap(),
-                partition_spec: PartitionSpec {
-                    spec_id: 0,
-                    fields: vec![],
-                },
+                schema: schema.clone(),
+                partition_spec: 
BoundPartitionSpec::builder(schema).with_spec_id(0).build().unwrap(),
                 content: ManifestContentType::Data,
                 format_version: FormatVersion::V2,
             },
@@ -1633,94 +1633,83 @@ mod tests {
 
     #[tokio::test]
     async fn test_parse_manifest_v2_partition() {
+        let schema = Arc::new(
+            Schema::builder()
+                .with_fields(vec![
+                    Arc::new(NestedField::optional(
+                        1,
+                        "id",
+                        Type::Primitive(PrimitiveType::Long),
+                    )),
+                    Arc::new(NestedField::optional(
+                        2,
+                        "v_int",
+                        Type::Primitive(PrimitiveType::Int),
+                    )),
+                    Arc::new(NestedField::optional(
+                        3,
+                        "v_long",
+                        Type::Primitive(PrimitiveType::Long),
+                    )),
+                    Arc::new(NestedField::optional(
+                        4,
+                        "v_float",
+                        Type::Primitive(PrimitiveType::Float),
+                    )),
+                    Arc::new(NestedField::optional(
+                        5,
+                        "v_double",
+                        Type::Primitive(PrimitiveType::Double),
+                    )),
+                    Arc::new(NestedField::optional(
+                        6,
+                        "v_varchar",
+                        Type::Primitive(PrimitiveType::String),
+                    )),
+                    Arc::new(NestedField::optional(
+                        7,
+                        "v_bool",
+                        Type::Primitive(PrimitiveType::Boolean),
+                    )),
+                    Arc::new(NestedField::optional(
+                        8,
+                        "v_date",
+                        Type::Primitive(PrimitiveType::Date),
+                    )),
+                    Arc::new(NestedField::optional(
+                        9,
+                        "v_timestamp",
+                        Type::Primitive(PrimitiveType::Timestamptz),
+                    )),
+                    Arc::new(NestedField::optional(
+                        10,
+                        "v_decimal",
+                        Type::Primitive(PrimitiveType::Decimal {
+                            precision: 36,
+                            scale: 10,
+                        }),
+                    )),
+                    Arc::new(NestedField::optional(
+                        11,
+                        "v_ts_ntz",
+                        Type::Primitive(PrimitiveType::Timestamp),
+                    )),
+                    Arc::new(NestedField::optional(
+                        12,
+                        "v_ts_ns_ntz",
+                        Type::Primitive(PrimitiveType::TimestampNs),
+                    )),
+                ])
+                .build()
+                .unwrap(),
+        );
         let manifest = Manifest {
             metadata: ManifestMetadata {
                 schema_id: 0,
-                schema: Schema::builder()
-                    .with_fields(vec![
-                        Arc::new(NestedField::optional(
-                            1,
-                            "id",
-                            Type::Primitive(PrimitiveType::Long),
-                        )),
-                        Arc::new(NestedField::optional(
-                            2,
-                            "v_int",
-                            Type::Primitive(PrimitiveType::Int),
-                        )),
-                        Arc::new(NestedField::optional(
-                            3,
-                            "v_long",
-                            Type::Primitive(PrimitiveType::Long),
-                        )),
-                        Arc::new(NestedField::optional(
-                            4,
-                            "v_float",
-                            Type::Primitive(PrimitiveType::Float),
-                        )),
-                        Arc::new(NestedField::optional(
-                            5,
-                            "v_double",
-                            Type::Primitive(PrimitiveType::Double),
-                        )),
-                        Arc::new(NestedField::optional(
-                            6,
-                            "v_varchar",
-                            Type::Primitive(PrimitiveType::String),
-                        )),
-                        Arc::new(NestedField::optional(
-                            7,
-                            "v_bool",
-                            Type::Primitive(PrimitiveType::Boolean),
-                        )),
-                        Arc::new(NestedField::optional(
-                            8,
-                            "v_date",
-                            Type::Primitive(PrimitiveType::Date),
-                        )),
-                        Arc::new(NestedField::optional(
-                            9,
-                            "v_timestamp",
-                            Type::Primitive(PrimitiveType::Timestamptz),
-                        )),
-                        Arc::new(NestedField::optional(
-                            10,
-                            "v_decimal",
-                            Type::Primitive(PrimitiveType::Decimal {
-                                precision: 36,
-                                scale: 10,
-                            }),
-                        )),
-                        Arc::new(NestedField::optional(
-                            11,
-                            "v_ts_ntz",
-                            Type::Primitive(PrimitiveType::Timestamp),
-                        )),
-                        Arc::new(NestedField::optional(
-                            12,
-                            "v_ts_ns_ntz",
-                            Type::Primitive(PrimitiveType::TimestampNs
-                        )))
-                    ])
-                    .build()
-                    .unwrap(),
-                partition_spec: PartitionSpec {
-                    spec_id: 0,
-                    fields: vec![
-                        PartitionField {
-                            name: "v_int".to_string(),
-                            transform: Transform::Identity,
-                            source_id: 2,
-                            field_id: 1000,
-                        },
-                        PartitionField {
-                            name: "v_long".to_string(),
-                            transform: Transform::Identity,
-                            source_id: 3,
-                            field_id: 1001,
-                        },
-                    ],
-                },
+                schema: schema.clone(),
+                partition_spec: BoundPartitionSpec::builder(schema)
+                .with_spec_id(0).add_partition_field("v_int", "v_int", 
Transform::Identity).unwrap()
+                .add_partition_field("v_long", "v_long", 
Transform::Identity).unwrap().build().unwrap(),
                 content: ManifestContentType::Data,
                 format_version: FormatVersion::V2,
             },
@@ -1802,34 +1791,34 @@ mod tests {
 
     #[tokio::test]
     async fn test_parse_manifest_v1_unpartition() {
+        let schema = Arc::new(
+            Schema::builder()
+                .with_schema_id(1)
+                .with_fields(vec![
+                    Arc::new(NestedField::optional(
+                        1,
+                        "id",
+                        Type::Primitive(PrimitiveType::Int),
+                    )),
+                    Arc::new(NestedField::optional(
+                        2,
+                        "data",
+                        Type::Primitive(PrimitiveType::String),
+                    )),
+                    Arc::new(NestedField::optional(
+                        3,
+                        "comment",
+                        Type::Primitive(PrimitiveType::String),
+                    )),
+                ])
+                .build()
+                .unwrap(),
+        );
         let manifest = Manifest {
             metadata: ManifestMetadata {
                 schema_id: 1,
-                schema: Schema::builder()
-                    .with_schema_id(1)
-                    .with_fields(vec![
-                        Arc::new(NestedField::optional(
-                            1,
-                            "id",
-                            Type::Primitive(PrimitiveType::Int),
-                        )),
-                        Arc::new(NestedField::optional(
-                            2,
-                            "data",
-                            Type::Primitive(PrimitiveType::String),
-                        )),
-                        Arc::new(NestedField::optional(
-                            3,
-                            "comment",
-                            Type::Primitive(PrimitiveType::String),
-                        )),
-                    ])
-                    .build()
-                    .unwrap(),
-                partition_spec: PartitionSpec {
-                    spec_id: 0,
-                    fields: vec![],
-                },
+                schema: schema.clone(),
+                partition_spec: 
BoundPartitionSpec::builder(schema).with_spec_id(0).build().unwrap(),
                 content: ManifestContentType::Data,
                 format_version: FormatVersion::V1,
             },
@@ -1867,38 +1856,33 @@ mod tests {
 
     #[tokio::test]
     async fn test_parse_manifest_v1_partition() {
+        let schema = Arc::new(
+            Schema::builder()
+                .with_fields(vec![
+                    Arc::new(NestedField::optional(
+                        1,
+                        "id",
+                        Type::Primitive(PrimitiveType::Long),
+                    )),
+                    Arc::new(NestedField::optional(
+                        2,
+                        "data",
+                        Type::Primitive(PrimitiveType::String),
+                    )),
+                    Arc::new(NestedField::optional(
+                        3,
+                        "category",
+                        Type::Primitive(PrimitiveType::String),
+                    )),
+                ])
+                .build()
+                .unwrap(),
+        );
         let manifest = Manifest {
             metadata: ManifestMetadata {
                 schema_id: 0,
-                schema: Schema::builder()
-                    .with_fields(vec![
-                        Arc::new(NestedField::optional(
-                            1,
-                            "id",
-                            Type::Primitive(PrimitiveType::Long),
-                        )),
-                        Arc::new(NestedField::optional(
-                            2,
-                            "data",
-                            Type::Primitive(PrimitiveType::String),
-                        )),
-                        Arc::new(NestedField::optional(
-                            3,
-                            "category",
-                            Type::Primitive(PrimitiveType::String),
-                        )),
-                    ])
-                    .build()
-                    .unwrap(),
-                partition_spec: PartitionSpec {
-                    spec_id: 0,
-                    fields: vec![PartitionField {
-                        name: "category".to_string(),
-                        transform: Transform::Identity,
-                        source_id: 3,
-                        field_id: 1000,
-                    }],
-                },
+                schema: schema.clone(),
+                partition_spec: 
BoundPartitionSpec::builder(schema).add_partition_field("category", "category", 
Transform::Identity).unwrap().build().unwrap(),
                 content: ManifestContentType::Data,
                 format_version: FormatVersion::V1,
             },
@@ -1956,28 +1940,28 @@ mod tests {
 
     #[tokio::test]
     async fn test_parse_manifest_with_schema_evolution() {
+        let schema = Arc::new(
+            Schema::builder()
+                .with_fields(vec![
+                    Arc::new(NestedField::optional(
+                        1,
+                        "id",
+                        Type::Primitive(PrimitiveType::Long),
+                    )),
+                    Arc::new(NestedField::optional(
+                        2,
+                        "v_int",
+                        Type::Primitive(PrimitiveType::Int),
+                    )),
+                ])
+                .build()
+                .unwrap(),
+        );
         let manifest = Manifest {
             metadata: ManifestMetadata {
                 schema_id: 0,
-                schema: Schema::builder()
-                    .with_fields(vec![
-                        Arc::new(NestedField::optional(
-                            1,
-                            "id",
-                            Type::Primitive(PrimitiveType::Long),
-                        )),
-                        Arc::new(NestedField::optional(
-                            2,
-                            "v_int",
-                            Type::Primitive(PrimitiveType::Int),
-                        )),
-                    ])
-                    .build()
-                    .unwrap(),
-                partition_spec: PartitionSpec {
-                    spec_id: 0,
-                    fields: vec![],
-                },
+                schema: schema.clone(),
+                partition_spec: 
BoundPartitionSpec::builder(schema).with_spec_id(0).build().unwrap(),
                 content: ManifestContentType::Data,
                 format_version: FormatVersion::V2,
             },
@@ -2028,28 +2012,28 @@ mod tests {
 
         // Compared with original manifest, the lower_bounds and upper_bounds 
no longer has data for field 3, and
         // other parts should be same.
+        let schema = Arc::new(
+            Schema::builder()
+                .with_fields(vec![
+                    Arc::new(NestedField::optional(
+                        1,
+                        "id",
+                        Type::Primitive(PrimitiveType::Long),
+                    )),
+                    Arc::new(NestedField::optional(
+                        2,
+                        "v_int",
+                        Type::Primitive(PrimitiveType::Int),
+                    )),
+                ])
+                .build()
+                .unwrap(),
+        );
         let expected_manifest = Manifest {
             metadata: ManifestMetadata {
                 schema_id: 0,
-                schema: Schema::builder()
-                    .with_fields(vec![
-                        Arc::new(NestedField::optional(
-                            1,
-                            "id",
-                            Type::Primitive(PrimitiveType::Long),
-                        )),
-                        Arc::new(NestedField::optional(
-                            2,
-                            "v_int",
-                            Type::Primitive(PrimitiveType::Int),
-                        )),
-                    ])
-                    .build()
-                    .unwrap(),
-                partition_spec: PartitionSpec {
-                    spec_id: 0,
-                    fields: vec![],
-                },
+                schema: schema.clone(),
+                partition_spec: 
BoundPartitionSpec::builder(schema).with_spec_id(0).build().unwrap(),
                 content: ManifestContentType::Data,
                 format_version: FormatVersion::V2,
             },
diff --git a/crates/iceberg/src/spec/partition.rs 
b/crates/iceberg/src/spec/partition.rs
index 36763df7..75e5d924 100644
--- a/crates/iceberg/src/spec/partition.rs
+++ b/crates/iceberg/src/spec/partition.rs
@@ -24,14 +24,15 @@ use serde::{Deserialize, Serialize};
 use typed_builder::TypedBuilder;
 
 use super::transform::Transform;
-use super::{NestedField, Schema, StructType};
+use super::{NestedField, Schema, SchemaRef, StructType};
 use crate::{Error, ErrorKind, Result};
 
 pub(crate) const UNPARTITIONED_LAST_ASSIGNED_ID: i32 = 999;
 pub(crate) const DEFAULT_PARTITION_SPEC_ID: i32 = 0;
 
-/// Reference to [`PartitionSpec`].
-pub type PartitionSpecRef = Arc<PartitionSpec>;
+/// Reference to [`BoundPartitionSpec`].
+pub type BoundPartitionSpecRef = Arc<BoundPartitionSpec>;
+
 /// Partition fields capture the transform from table data to partition values.
 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, TypedBuilder)]
 #[serde(rename_all = "kebab-case")]
@@ -54,22 +55,51 @@ impl PartitionField {
     }
 }
 
-///  Partition spec that defines how to produce a tuple of partition values 
from a record.
-#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default)]
+/// Partition spec that defines how to produce a tuple of partition values 
from a record.
+/// `PartitionSpec` is bound to a specific schema.
+#[derive(Debug, PartialEq, Eq, Clone)]
+pub struct BoundPartitionSpec {
+    /// Identifier for PartitionSpec
+    spec_id: i32,
+    /// Details of the partition spec
+    fields: Vec<PartitionField>,
+    /// The schema this partition spec is bound to
+    schema: SchemaRef,
+    /// Type of the partition spec
+    partition_type: StructType,
+}
+
+/// Reference to [`SchemalessPartitionSpec`].
+pub type SchemalessPartitionSpecRef = Arc<SchemalessPartitionSpec>;
+/// Partition spec that defines how to produce a tuple of partition values 
from a record.
+/// Schemaless partition specs are never constructed manually. They occur when 
a table is mutated
+/// and partition spec and schemas are updated. While old partition specs are 
retained, the bound
+/// schema might not be available anymore as part of the table metadata.
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
 #[serde(rename_all = "kebab-case")]
-pub struct PartitionSpec {
+pub struct SchemalessPartitionSpec {
     /// Identifier for PartitionSpec
-    pub(crate) spec_id: i32,
+    spec_id: i32,
     /// Details of the partition spec
-    pub(crate) fields: Vec<PartitionField>,
+    fields: Vec<PartitionField>,
 }
 
-impl PartitionSpec {
+impl BoundPartitionSpec {
     /// Create partition spec builder
-    pub fn builder(schema: &Schema) -> PartitionSpecBuilder {
+    pub fn builder(schema: impl Into<SchemaRef>) -> PartitionSpecBuilder {
         PartitionSpecBuilder::new(schema)
     }
 
+    /// Get a new unpatitioned partition spec
+    pub fn unpartition_spec(schema: impl Into<SchemaRef>) -> Self {
+        Self {
+            spec_id: DEFAULT_PARTITION_SPEC_ID,
+            fields: vec![],
+            schema: schema.into(),
+            partition_type: StructType::new(vec![]),
+        }
+    }
+
     /// Spec id of the partition spec
     pub fn spec_id(&self) -> i32 {
         self.spec_id
@@ -80,39 +110,21 @@ impl PartitionSpec {
         &self.fields
     }
 
+    /// The schema this partition spec is bound to
+    pub fn schema(&self) -> &Schema {
+        &self.schema
+    }
+
+    /// The schema ref this partition spec is bound to
+    pub fn schema_ref(&self) -> &SchemaRef {
+        &self.schema
+    }
+
     /// Returns if the partition spec is unpartitioned.
     ///
     /// A [`PartitionSpec`] is unpartitioned if it has no fields or all fields 
are [`Transform::Void`] transform.
     pub fn is_unpartitioned(&self) -> bool {
-        self.fields.is_empty()
-            || self
-                .fields
-                .iter()
-                .all(|f| matches!(f.transform, Transform::Void))
-    }
-
-    /// Returns the partition type of this partition spec.
-    pub fn partition_type(&self, schema: &Schema) -> Result<StructType> {
-        let mut fields = Vec::with_capacity(self.fields.len());
-        for partition_field in &self.fields {
-            let field = schema
-                .field_by_id(partition_field.source_id)
-                .ok_or_else(|| {
-                    Error::new(
-                        ErrorKind::DataInvalid,
-                        format!(
-                            "No column with source column id {} in schema 
{:?}",
-                            partition_field.source_id, schema
-                        ),
-                    )
-                })?;
-            let res_type = 
partition_field.transform.result_type(&field.field_type)?;
-            let field =
-                NestedField::optional(partition_field.field_id, 
&partition_field.name, res_type)
-                    .into();
-            fields.push(field);
-        }
-        Ok(StructType::new(fields))
+        self.fields.is_empty() || self.fields.iter().all(|f| f.transform == 
Transform::Void)
     }
 
     /// Turn this partition spec into an unbound partition spec.
@@ -122,6 +134,29 @@ impl PartitionSpec {
         self.into()
     }
 
+    /// Turn this partition spec into a preserved partition spec.
+    pub fn into_schemaless(self) -> SchemalessPartitionSpec {
+        self.into()
+    }
+
+    /// Check if this partition spec has sequential partition ids.
+    /// Sequential ids start from 1000 and increment by 1 for each field.
+    /// This is required for spec version 1
+    pub fn has_sequential_ids(&self) -> bool {
+        has_sequential_ids(self.fields.iter().map(|f| f.field_id))
+    }
+
+    /// Get the highest field id in the partition spec.
+    /// If the partition spec is unpartitioned, it returns the last 
unpartitioned last assigned id (999).
+    pub fn highest_field_id(&self) -> Option<i32> {
+        self.fields.iter().map(|f| f.field_id).max()
+    }
+
+    /// Returns the partition type of this partition spec.
+    pub fn partition_type(&self) -> &StructType {
+        &self.partition_type
+    }
+
     /// Check if this partition spec is compatible with another partition spec.
     ///
     /// Returns true if the partition spec is equal to the other spec with 
partition field ids ignored and
@@ -131,15 +166,15 @@ impl PartitionSpec {
     /// * Field names
     /// * Source column ids
     /// * Transforms
-    pub fn is_compatible_with(&self, other: &UnboundPartitionSpec) -> bool {
+    pub fn is_compatible_with_schemaless(&self, other: 
&SchemalessPartitionSpec) -> bool {
         if self.fields.len() != other.fields.len() {
             return false;
         }
 
-        for (this_field, other_field) in self.fields.iter().zip(&other.fields) 
{
+        for (this_field, other_field) in 
self.fields.iter().zip(other.fields.iter()) {
             if this_field.source_id != other_field.source_id
-                || this_field.transform != other_field.transform
                 || this_field.name != other_field.name
+                || this_field.transform != other_field.transform
             {
                 return false;
             }
@@ -147,33 +182,40 @@ impl PartitionSpec {
 
         true
     }
+}
 
-    /// Check if this partition spec has sequential partition ids.
-    /// Sequential ids start from 1000 and increment by 1 for each field.
-    /// This is required for spec version 1
-    pub fn has_sequential_ids(&self) -> bool {
-        for (index, field) in self.fields.iter().enumerate() {
-            let expected_id = (UNPARTITIONED_LAST_ASSIGNED_ID as i64)
-                .checked_add(1)
-                .and_then(|id| id.checked_add(index as i64))
-                .unwrap_or(i64::MAX);
+impl SchemalessPartitionSpec {
+    /// Fields of the partition spec
+    pub fn fields(&self) -> &[PartitionField] {
+        &self.fields
+    }
 
-            if field.field_id as i64 != expected_id {
-                return false;
-            }
+    /// Spec id of the partition spec
+    pub fn spec_id(&self) -> i32 {
+        self.spec_id
+    }
+
+    /// Bind this schemaless partition spec to a schema.
+    pub fn bind(self, schema: impl Into<SchemaRef>) -> 
Result<BoundPartitionSpec> {
+        PartitionSpecBuilder::new_from_unbound(self.into_unbound(), 
schema)?.build()
+    }
+
+    /// Get a new unpatitioned partition spec
+    pub fn unpartition_spec() -> Self {
+        Self {
+            spec_id: DEFAULT_PARTITION_SPEC_ID,
+            fields: vec![],
         }
+    }
 
-        true
+    /// Returns the partition type of this partition spec.
+    pub fn partition_type(&self, schema: &Schema) -> Result<StructType> {
+        PartitionSpecBuilder::partition_type(&self.fields, schema)
     }
 
-    /// Get the highest field id in the partition spec.
-    /// If the partition spec is unpartitioned, it returns the last 
unpartitioned last assigned id (999).
-    pub fn highest_field_id(&self) -> i32 {
-        self.fields
-            .iter()
-            .map(|f| f.field_id)
-            .max()
-            .unwrap_or(UNPARTITIONED_LAST_ASSIGNED_ID)
+    /// Convert to unbound partition spec
+    pub fn into_unbound(self) -> UnboundPartitionSpec {
+        self.into()
     }
 }
 
@@ -212,7 +254,7 @@ impl UnboundPartitionSpec {
     }
 
     /// Bind this unbound partition spec to a schema.
-    pub fn bind(self, schema: &Schema) -> Result<PartitionSpec> {
+    pub fn bind(self, schema: impl Into<SchemaRef>) -> 
Result<BoundPartitionSpec> {
         PartitionSpecBuilder::new_from_unbound(self, schema)?.build()
     }
 
@@ -235,6 +277,21 @@ impl UnboundPartitionSpec {
     }
 }
 
+fn has_sequential_ids(field_ids: impl Iterator<Item = i32>) -> bool {
+    for (index, field_id) in field_ids.enumerate() {
+        let expected_id = (UNPARTITIONED_LAST_ASSIGNED_ID as i64)
+            .checked_add(1)
+            .and_then(|id| id.checked_add(index as i64))
+            .unwrap_or(i64::MAX);
+
+        if field_id as i64 != expected_id {
+            return false;
+        }
+    }
+
+    true
+}
+
 impl From<PartitionField> for UnboundPartitionField {
     fn from(field: PartitionField) -> Self {
         UnboundPartitionField {
@@ -246,8 +303,17 @@ impl From<PartitionField> for UnboundPartitionField {
     }
 }
 
-impl From<PartitionSpec> for UnboundPartitionSpec {
-    fn from(spec: PartitionSpec) -> Self {
+impl From<BoundPartitionSpec> for UnboundPartitionSpec {
+    fn from(spec: BoundPartitionSpec) -> Self {
+        UnboundPartitionSpec {
+            spec_id: Some(spec.spec_id),
+            fields: spec.fields.into_iter().map(Into::into).collect(),
+        }
+    }
+}
+
+impl From<SchemalessPartitionSpec> for UnboundPartitionSpec {
+    fn from(spec: SchemalessPartitionSpec) -> Self {
         UnboundPartitionSpec {
             spec_id: Some(spec.spec_id),
             fields: spec.fields.into_iter().map(Into::into).collect(),
@@ -255,6 +321,15 @@ impl From<PartitionSpec> for UnboundPartitionSpec {
     }
 }
 
+impl From<BoundPartitionSpec> for SchemalessPartitionSpec {
+    fn from(spec: BoundPartitionSpec) -> Self {
+        SchemalessPartitionSpec {
+            spec_id: spec.spec_id,
+            fields: spec.fields,
+        }
+    }
+}
+
 /// Create a new UnboundPartitionSpec
 #[derive(Debug, Default)]
 pub struct UnboundPartitionSpecBuilder {
@@ -326,26 +401,29 @@ impl UnboundPartitionSpecBuilder {
 
 /// Create valid partition specs for a given schema.
 #[derive(Debug)]
-pub struct PartitionSpecBuilder<'a> {
+pub struct PartitionSpecBuilder {
     spec_id: Option<i32>,
     last_assigned_field_id: i32,
     fields: Vec<UnboundPartitionField>,
-    schema: &'a Schema,
+    schema: SchemaRef,
 }
 
-impl<'a> PartitionSpecBuilder<'a> {
+impl PartitionSpecBuilder {
     /// Create a new partition spec builder with the given schema.
-    pub fn new(schema: &'a Schema) -> Self {
+    pub fn new(schema: impl Into<SchemaRef>) -> Self {
         Self {
             spec_id: None,
             fields: vec![],
             last_assigned_field_id: UNPARTITIONED_LAST_ASSIGNED_ID,
-            schema,
+            schema: schema.into(),
         }
     }
 
     /// Create a new partition spec builder from an existing unbound partition 
spec.
-    pub fn new_from_unbound(unbound: UnboundPartitionSpec, schema: &'a Schema) 
-> Result<Self> {
+    pub fn new_from_unbound(
+        unbound: UnboundPartitionSpec,
+        schema: impl Into<SchemaRef>,
+    ) -> Result<Self> {
         let mut builder =
             
Self::new(schema).with_spec_id(unbound.spec_id.unwrap_or(DEFAULT_PARTITION_SPEC_ID));
 
@@ -408,8 +486,8 @@ impl<'a> PartitionSpecBuilder<'a> {
     pub fn add_unbound_field(mut self, field: UnboundPartitionField) -> 
Result<Self> {
         self.check_name_set_and_unique(&field.name)?;
         self.check_for_redundant_partitions(field.source_id, 
&field.transform)?;
-        Self::check_name_does_not_collide_with_schema(&field, self.schema)?;
-        Self::check_transform_compatibility(&field, self.schema)?;
+        Self::check_name_does_not_collide_with_schema(&field, &self.schema)?;
+        Self::check_transform_compatibility(&field, &self.schema)?;
         if let Some(partition_field_id) = field.field_id {
             self.check_partition_id_unique(partition_field_id)?;
         }
@@ -432,11 +510,14 @@ impl<'a> PartitionSpecBuilder<'a> {
     }
 
     /// Build a bound partition spec with the given schema.
-    pub fn build(self) -> Result<PartitionSpec> {
+    pub fn build(self) -> Result<BoundPartitionSpec> {
         let fields = Self::set_field_ids(self.fields, 
self.last_assigned_field_id)?;
-        Ok(PartitionSpec {
+        let partition_type = Self::partition_type(&fields, &self.schema)?;
+        Ok(BoundPartitionSpec {
             spec_id: self.spec_id.unwrap_or(DEFAULT_PARTITION_SPEC_ID),
             fields,
+            partition_type,
+            schema: self.schema,
         })
     }
 
@@ -485,6 +566,32 @@ impl<'a> PartitionSpecBuilder<'a> {
         Ok(bound_fields)
     }
 
+    /// Returns the partition type of this partition spec.
+    fn partition_type(fields: &Vec<PartitionField>, schema: &Schema) -> 
Result<StructType> {
+        let mut struct_fields = Vec::with_capacity(fields.len());
+        for partition_field in fields {
+            let field = schema
+                .field_by_id(partition_field.source_id)
+                .ok_or_else(|| {
+                    Error::new(
+                        // This should never occur as 
check_transform_compatibility
+                        // already ensures that the source field exists in the 
schema
+                        ErrorKind::Unexpected,
+                        format!(
+                            "No column with source column id {} in schema 
{:?}",
+                            partition_field.source_id, schema
+                        ),
+                    )
+                })?;
+            let res_type = 
partition_field.transform.result_type(&field.field_type)?;
+            let field =
+                NestedField::optional(partition_field.field_id, 
&partition_field.name, res_type)
+                    .into();
+            struct_fields.push(field);
+        }
+        Ok(StructType::new(struct_fields))
+    }
+
     /// Ensure that the partition name is unique among columns in the schema.
     /// Duplicate names are allowed if:
     /// 1. The column is sourced from the column with the same name.
@@ -622,7 +729,7 @@ trait CorePartitionSpecValidator {
     fn fields(&self) -> &Vec<UnboundPartitionField>;
 }
 
-impl CorePartitionSpecValidator for PartitionSpecBuilder<'_> {
+impl CorePartitionSpecValidator for PartitionSpecBuilder {
     fn fields(&self) -> &Vec<UnboundPartitionField> {
         &self.fields
     }
@@ -637,7 +744,7 @@ impl CorePartitionSpecValidator for 
UnboundPartitionSpecBuilder {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::spec::Type;
+    use crate::spec::{PrimitiveType, Type};
 
     #[test]
     fn test_partition_spec() {
@@ -663,7 +770,7 @@ mod tests {
         }
         "#;
 
-        let partition_spec: PartitionSpec = 
serde_json::from_str(spec).unwrap();
+        let partition_spec: SchemalessPartitionSpec = 
serde_json::from_str(spec).unwrap();
         assert_eq!(4, partition_spec.fields[0].source_id);
         assert_eq!(1000, partition_spec.fields[0].field_id);
         assert_eq!("ts_day", partition_spec.fields[0].name);
@@ -695,7 +802,7 @@ mod tests {
             ])
             .build()
             .unwrap();
-        let partition_spec = PartitionSpec::builder(&schema)
+        let partition_spec = BoundPartitionSpec::builder(schema.clone())
             .with_spec_id(1)
             .build()
             .unwrap();
@@ -704,7 +811,7 @@ mod tests {
             "Empty partition spec should be unpartitioned"
         );
 
-        let partition_spec = PartitionSpec::builder(&schema)
+        let partition_spec = BoundPartitionSpec::builder(schema.clone())
             .add_unbound_fields(vec![
                 UnboundPartitionField::builder()
                     .source_id(1)
@@ -726,7 +833,7 @@ mod tests {
             "Partition spec with one non void transform should not be 
unpartitioned"
         );
 
-        let partition_spec = PartitionSpec::builder(&schema)
+        let partition_spec = BoundPartitionSpec::builder(schema.clone())
             .with_spec_id(1)
             .add_unbound_fields(vec![
                 UnboundPartitionField::builder()
@@ -809,6 +916,32 @@ mod tests {
         assert_eq!(Transform::Day, partition_spec.fields[0].transform);
     }
 
+    #[test]
+    fn test_new_unpartition() {
+        let schema = Schema::builder()
+            .with_fields(vec![
+                NestedField::required(1, "id", 
Type::Primitive(crate::spec::PrimitiveType::Int))
+                    .into(),
+                NestedField::required(
+                    2,
+                    "name",
+                    Type::Primitive(crate::spec::PrimitiveType::String),
+                )
+                .into(),
+            ])
+            .build()
+            .unwrap();
+        let partition_spec = BoundPartitionSpec::builder(schema.clone())
+            .with_spec_id(0)
+            .build()
+            .unwrap();
+        let partition_type = partition_spec.partition_type();
+        assert_eq!(0, partition_type.fields().len());
+
+        let unpartition_spec = BoundPartitionSpec::unpartition_spec(schema);
+        assert_eq!(partition_spec, unpartition_spec);
+    }
+
     #[test]
     fn test_partition_type() {
         let spec = r#"
@@ -833,7 +966,7 @@ mod tests {
             }
             "#;
 
-        let partition_spec: PartitionSpec = 
serde_json::from_str(spec).unwrap();
+        let partition_spec: SchemalessPartitionSpec = 
serde_json::from_str(spec).unwrap();
         let schema = Schema::builder()
             .with_fields(vec![
                 NestedField::required(1, "id", 
Type::Primitive(crate::spec::PrimitiveType::Int))
@@ -909,7 +1042,7 @@ mod tests {
             }
             "#;
 
-        let partition_spec: PartitionSpec = 
serde_json::from_str(spec).unwrap();
+        let partition_spec: SchemalessPartitionSpec = 
serde_json::from_str(spec).unwrap();
         let schema = Schema::builder()
             .with_fields(vec![
                 NestedField::required(1, "id", 
Type::Primitive(crate::spec::PrimitiveType::Int))
@@ -976,7 +1109,7 @@ mod tests {
         }
         "#;
 
-        let partition_spec: PartitionSpec = 
serde_json::from_str(spec).unwrap();
+        let partition_spec: SchemalessPartitionSpec = 
serde_json::from_str(spec).unwrap();
         let schema = Schema::builder()
             .with_fields(vec![
                 NestedField::required(1, "id", 
Type::Primitive(crate::spec::PrimitiveType::Int))
@@ -994,6 +1127,50 @@ mod tests {
         assert!(partition_spec.partition_type(&schema).is_err());
     }
 
+    #[test]
+    fn test_schemaless_bind_schema_keeps_field_ids_and_spec_id() {
+        let schema: Schema = Schema::builder()
+            .with_fields(vec![
+                NestedField::required(1, "id", 
Type::Primitive(crate::spec::PrimitiveType::Int))
+                    .into(),
+                NestedField::required(
+                    2,
+                    "name",
+                    Type::Primitive(crate::spec::PrimitiveType::String),
+                )
+                .into(),
+            ])
+            .build()
+            .unwrap();
+
+        let partition_spec = BoundPartitionSpec::builder(schema.clone())
+            .with_spec_id(99)
+            .add_unbound_field(UnboundPartitionField {
+                source_id: 1,
+                field_id: Some(1010),
+                name: "id".to_string(),
+                transform: Transform::Identity,
+            })
+            .unwrap()
+            .add_unbound_field(UnboundPartitionField {
+                source_id: 2,
+                field_id: Some(1001),
+                name: "name_void".to_string(),
+                transform: Transform::Void,
+            })
+            .unwrap()
+            .build()
+            .unwrap();
+
+        let schemaless_partition_spec = 
SchemalessPartitionSpec::from(partition_spec.clone());
+        let bound_partition_spec = 
schemaless_partition_spec.bind(schema).unwrap();
+
+        assert_eq!(partition_spec, bound_partition_spec);
+        assert_eq!(partition_spec.fields[0].field_id, 1010);
+        assert_eq!(partition_spec.fields[1].field_id, 1001);
+        assert_eq!(bound_partition_spec.spec_id(), 99);
+    }
+
     #[test]
     fn test_builder_disallow_duplicate_names() {
         UnboundPartitionSpec::builder()
@@ -1018,7 +1195,7 @@ mod tests {
             ])
             .build()
             .unwrap();
-        PartitionSpec::builder(&schema)
+        BoundPartitionSpec::builder(schema.clone())
             .add_unbound_field(UnboundPartitionField {
                 source_id: 1,
                 field_id: Some(1000),
@@ -1056,7 +1233,7 @@ mod tests {
             ])
             .build()
             .unwrap();
-        let spec = PartitionSpec::builder(&schema)
+        let spec = BoundPartitionSpec::builder(schema.clone())
             .with_spec_id(1)
             .add_unbound_field(UnboundPartitionField {
                 source_id: 1,
@@ -1104,26 +1281,33 @@ mod tests {
             .build()
             .unwrap();
 
-        PartitionSpec::builder(&schema)
+        BoundPartitionSpec::builder(schema.clone())
             .with_spec_id(1)
             .build()
             .unwrap();
 
-        let spec = PartitionSpec::builder(&schema)
+        let spec = BoundPartitionSpec::builder(schema.clone())
             .with_spec_id(1)
             .add_partition_field("id", "id_bucket[16]", Transform::Bucket(16))
             .unwrap()
             .build()
             .unwrap();
 
-        assert_eq!(spec, PartitionSpec {
+        assert_eq!(spec, BoundPartitionSpec {
             spec_id: 1,
+            schema: schema.into(),
             fields: vec![PartitionField {
                 source_id: 1,
                 field_id: 1000,
                 name: "id_bucket[16]".to_string(),
                 transform: Transform::Bucket(16),
-            }]
+            }],
+            partition_type: StructType::new(vec![NestedField::optional(
+                1000,
+                "id_bucket[16]",
+                Type::Primitive(PrimitiveType::Int)
+            )
+            .into()])
         });
     }
 
@@ -1139,12 +1323,12 @@ mod tests {
             .build()
             .unwrap();
 
-        PartitionSpec::builder(&schema)
+        BoundPartitionSpec::builder(schema.clone())
             .with_spec_id(1)
             .build()
             .unwrap();
 
-        let err = PartitionSpec::builder(&schema)
+        let err = BoundPartitionSpec::builder(schema)
             .with_spec_id(1)
             .add_unbound_field(UnboundPartitionField {
                 source_id: 1,
@@ -1172,12 +1356,12 @@ mod tests {
             .build()
             .unwrap();
 
-        PartitionSpec::builder(&schema)
+        BoundPartitionSpec::builder(schema.clone())
             .with_spec_id(1)
             .build()
             .unwrap();
 
-        PartitionSpec::builder(&schema)
+        BoundPartitionSpec::builder(schema.clone())
             .with_spec_id(1)
             .add_unbound_field(UnboundPartitionField {
                 source_id: 1,
@@ -1190,7 +1374,7 @@ mod tests {
             .unwrap();
 
         // Not OK for different source id
-        PartitionSpec::builder(&schema)
+        BoundPartitionSpec::builder(schema)
             .with_spec_id(1)
             .add_unbound_field(UnboundPartitionField {
                 source_id: 2,
@@ -1224,7 +1408,7 @@ mod tests {
             .unwrap();
 
         // Valid
-        PartitionSpec::builder(&schema)
+        BoundPartitionSpec::builder(schema.clone())
             .with_spec_id(1)
             .add_unbound_fields(vec![
                 UnboundPartitionField {
@@ -1245,7 +1429,7 @@ mod tests {
             .unwrap();
 
         // Invalid
-        PartitionSpec::builder(&schema)
+        BoundPartitionSpec::builder(schema)
             .with_spec_id(1)
             .add_unbound_fields(vec![
                 UnboundPartitionField {
@@ -1291,7 +1475,7 @@ mod tests {
             .build()
             .unwrap();
 
-        PartitionSpec::builder(&schema)
+        BoundPartitionSpec::builder(schema)
             .with_spec_id(1)
             .add_unbound_field(UnboundPartitionField {
                 source_id: 1,
@@ -1342,7 +1526,7 @@ mod tests {
             .build()
             .unwrap();
 
-        let partition_spec_1 = PartitionSpec::builder(&schema)
+        let partition_spec_1 = BoundPartitionSpec::builder(schema.clone())
             .with_spec_id(1)
             .add_unbound_field(UnboundPartitionField {
                 source_id: 1,
@@ -1354,7 +1538,7 @@ mod tests {
             .build()
             .unwrap();
 
-        let partition_spec_2 = PartitionSpec::builder(&schema)
+        let partition_spec_2 = BoundPartitionSpec::builder(schema)
             .with_spec_id(1)
             .add_unbound_field(UnboundPartitionField {
                 source_id: 1,
@@ -1366,7 +1550,7 @@ mod tests {
             .build()
             .unwrap();
 
-        
assert!(partition_spec_1.is_compatible_with(&partition_spec_2.into_unbound()));
+        
assert!(partition_spec_1.is_compatible_with_schemaless(&partition_spec_2.into_schemaless()));
     }
 
     #[test]
@@ -1381,7 +1565,7 @@ mod tests {
             .build()
             .unwrap();
 
-        let partition_spec_1 = PartitionSpec::builder(&schema)
+        let partition_spec_1 = BoundPartitionSpec::builder(schema.clone())
             .with_spec_id(1)
             .add_unbound_field(UnboundPartitionField {
                 source_id: 1,
@@ -1393,7 +1577,7 @@ mod tests {
             .build()
             .unwrap();
 
-        let partition_spec_2 = PartitionSpec::builder(&schema)
+        let partition_spec_2 = BoundPartitionSpec::builder(schema)
             .with_spec_id(1)
             .add_unbound_field(UnboundPartitionField {
                 source_id: 1,
@@ -1405,7 +1589,9 @@ mod tests {
             .build()
             .unwrap();
 
-        
assert!(!partition_spec_1.is_compatible_with(&partition_spec_2.into_unbound()));
+        assert!(
+            
!partition_spec_1.is_compatible_with_schemaless(&partition_spec_2.into_schemaless())
+        );
     }
 
     #[test]
@@ -1424,7 +1610,7 @@ mod tests {
             .build()
             .unwrap();
 
-        let partition_spec_1 = PartitionSpec::builder(&schema)
+        let partition_spec_1 = BoundPartitionSpec::builder(schema.clone())
             .with_spec_id(1)
             .add_unbound_field(UnboundPartitionField {
                 source_id: 1,
@@ -1436,7 +1622,7 @@ mod tests {
             .build()
             .unwrap();
 
-        let partition_spec_2 = PartitionSpec::builder(&schema)
+        let partition_spec_2 = BoundPartitionSpec::builder(schema)
             .with_spec_id(1)
             .add_unbound_field(UnboundPartitionField {
                 source_id: 2,
@@ -1448,7 +1634,9 @@ mod tests {
             .build()
             .unwrap();
 
-        
assert!(!partition_spec_1.is_compatible_with(&partition_spec_2.into_unbound()));
+        assert!(
+            
!partition_spec_1.is_compatible_with_schemaless(&partition_spec_2.into_schemaless())
+        );
     }
 
     #[test]
@@ -1467,7 +1655,7 @@ mod tests {
             .build()
             .unwrap();
 
-        let partition_spec_1 = PartitionSpec::builder(&schema)
+        let partition_spec_1 = BoundPartitionSpec::builder(schema.clone())
             .with_spec_id(1)
             .add_unbound_field(UnboundPartitionField {
                 source_id: 1,
@@ -1486,7 +1674,7 @@ mod tests {
             .build()
             .unwrap();
 
-        let partition_spec_2 = PartitionSpec::builder(&schema)
+        let partition_spec_2 = BoundPartitionSpec::builder(schema)
             .with_spec_id(1)
             .add_unbound_field(UnboundPartitionField {
                 source_id: 2,
@@ -1505,17 +1693,20 @@ mod tests {
             .build()
             .unwrap();
 
-        
assert!(!partition_spec_1.is_compatible_with(&partition_spec_2.into_unbound()));
+        assert!(
+            
!partition_spec_1.is_compatible_with_schemaless(&partition_spec_2.into_schemaless())
+        );
     }
 
     #[test]
     fn test_highest_field_id_unpartitioned() {
-        let spec = 
PartitionSpec::builder(&Schema::builder().with_fields(vec![]).build().unwrap())
-            .with_spec_id(1)
-            .build()
-            .unwrap();
+        let spec =
+            
BoundPartitionSpec::builder(Schema::builder().with_fields(vec![]).build().unwrap())
+                .with_spec_id(1)
+                .build()
+                .unwrap();
 
-        assert_eq!(UNPARTITIONED_LAST_ASSIGNED_ID, spec.highest_field_id());
+        assert!(spec.highest_field_id().is_none());
     }
 
     #[test]
@@ -1534,7 +1725,7 @@ mod tests {
             .build()
             .unwrap();
 
-        let spec = PartitionSpec::builder(&schema)
+        let spec = BoundPartitionSpec::builder(schema)
             .with_spec_id(1)
             .add_unbound_field(UnboundPartitionField {
                 source_id: 1,
@@ -1553,7 +1744,7 @@ mod tests {
             .build()
             .unwrap();
 
-        assert_eq!(1001, spec.highest_field_id());
+        assert_eq!(Some(1001), spec.highest_field_id());
     }
 
     #[test]
@@ -1572,7 +1763,7 @@ mod tests {
             .build()
             .unwrap();
 
-        let spec = PartitionSpec::builder(&schema)
+        let spec = BoundPartitionSpec::builder(schema)
             .with_spec_id(1)
             .add_unbound_field(UnboundPartitionField {
                 source_id: 1,
@@ -1612,7 +1803,7 @@ mod tests {
             .build()
             .unwrap();
 
-        let spec = PartitionSpec::builder(&schema)
+        let spec = BoundPartitionSpec::builder(schema)
             .with_spec_id(1)
             .add_unbound_field(UnboundPartitionField {
                 source_id: 1,
@@ -1652,7 +1843,7 @@ mod tests {
             .build()
             .unwrap();
 
-        let spec = PartitionSpec::builder(&schema)
+        let spec = BoundPartitionSpec::builder(schema)
             .with_spec_id(1)
             .add_unbound_field(UnboundPartitionField {
                 source_id: 1,
diff --git a/crates/iceberg/src/spec/table_metadata.rs 
b/crates/iceberg/src/spec/table_metadata.rs
index cde70937..4a2e3ab7 100644
--- a/crates/iceberg/src/spec/table_metadata.rs
+++ b/crates/iceberg/src/spec/table_metadata.rs
@@ -31,8 +31,8 @@ use uuid::Uuid;
 
 use super::snapshot::SnapshotReference;
 use super::{
-    PartitionSpec, PartitionSpecRef, SchemaId, SchemaRef, Snapshot, 
SnapshotRef, SnapshotRetention,
-    SortOrder, SortOrderRef, DEFAULT_PARTITION_SPEC_ID,
+    BoundPartitionSpec, BoundPartitionSpecRef, SchemaId, SchemaRef, 
SchemalessPartitionSpecRef,
+    Snapshot, SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, 
DEFAULT_PARTITION_SPEC_ID,
 };
 use crate::error::{timestamp_ms_to_utc, Result};
 use crate::{Error, ErrorKind, TableCreation};
@@ -118,9 +118,9 @@ pub struct TableMetadata {
     /// ID of the table’s current schema.
     pub(crate) current_schema_id: i32,
     /// A list of partition specs, stored as full partition spec objects.
-    pub(crate) partition_specs: HashMap<i32, PartitionSpecRef>,
+    pub(crate) partition_specs: HashMap<i32, SchemalessPartitionSpecRef>,
     /// ID of the “current” spec that writers should use by default.
-    pub(crate) default_spec_id: i32,
+    pub(crate) default_spec: BoundPartitionSpecRef,
     /// An integer; the highest assigned partition field ID across all 
partition specs for the table.
     pub(crate) last_partition_id: i32,
     ///A string to string map of table properties. This is used to control 
settings that
@@ -222,21 +222,26 @@ impl TableMetadata {
 
     /// Returns all partition specs.
     #[inline]
-    pub fn partition_specs_iter(&self) -> impl Iterator<Item = 
&PartitionSpecRef> {
+    pub fn partition_specs_iter(&self) -> impl Iterator<Item = 
&SchemalessPartitionSpecRef> {
         self.partition_specs.values()
     }
 
     /// Lookup partition spec by id.
     #[inline]
-    pub fn partition_spec_by_id(&self, spec_id: i32) -> 
Option<&PartitionSpecRef> {
+    pub fn partition_spec_by_id(&self, spec_id: i32) -> 
Option<&SchemalessPartitionSpecRef> {
         self.partition_specs.get(&spec_id)
     }
 
     /// Get default partition spec
     #[inline]
-    pub fn default_partition_spec(&self) -> &PartitionSpecRef {
-        self.partition_spec_by_id(self.default_spec_id)
-            .expect("Default partition spec id set, but not found in table 
metadata")
+    pub fn default_partition_spec(&self) -> &BoundPartitionSpecRef {
+        &self.default_spec
+    }
+
+    #[inline]
+    /// Returns spec id of the "current" partition spec.
+    pub fn default_partition_spec_id(&self) -> i32 {
+        self.default_spec.spec_id()
     }
 
     /// Returns all snapshots
@@ -352,29 +357,18 @@ impl TableMetadata {
         Ok(self)
     }
 
-    /// If the default partition spec is specified but the spec is not present 
in specs, add it
+    /// If the default partition spec is not present in specs, add it
     fn try_normalize_partition_spec(&mut self) -> Result<()> {
-        if self.partition_spec_by_id(self.default_spec_id).is_some() {
-            return Ok(());
-        }
-
-        if self.default_spec_id != DEFAULT_PARTITION_SPEC_ID {
-            return Err(Error::new(
-                ErrorKind::DataInvalid,
-                format!(
-                    "No partition spec exists with the default spec id {}.",
-                    self.default_spec_id
-                ),
-            ));
+        if self
+            .partition_spec_by_id(self.default_spec.spec_id())
+            .is_none()
+        {
+            self.partition_specs.insert(
+                self.default_spec.spec_id(),
+                
Arc::new(Arc::unwrap_or_clone(self.default_spec.clone()).into_schemaless()),
+            );
         }
 
-        let partition_spec = PartitionSpec {
-            spec_id: DEFAULT_PARTITION_SPEC_ID,
-            fields: vec![],
-        };
-        self.partition_specs
-            .insert(DEFAULT_PARTITION_SPEC_ID, Arc::new(partition_spec));
-
         Ok(())
     }
 
@@ -565,6 +559,8 @@ impl TableMetadataBuilder {
             properties,
         } = table_creation;
 
+        let schema: Arc<super::Schema> = Arc::new(schema);
+        let unpartition_spec = 
BoundPartitionSpec::unpartition_spec(schema.clone());
         let partition_specs = match partition_spec {
             Some(_) => {
                 return Err(Error::new(
@@ -573,11 +569,8 @@ impl TableMetadataBuilder {
                 ))
             }
             None => HashMap::from([(
-                DEFAULT_PARTITION_SPEC_ID,
-                Arc::new(PartitionSpec {
-                    spec_id: DEFAULT_PARTITION_SPEC_ID,
-                    fields: vec![],
-                }),
+                unpartition_spec.spec_id(),
+                Arc::new(unpartition_spec.clone().into_schemaless()),
             )]),
         };
 
@@ -607,9 +600,9 @@ impl TableMetadataBuilder {
             last_updated_ms: Utc::now().timestamp_millis(),
             last_column_id: schema.highest_field_id(),
             current_schema_id: schema.schema_id(),
-            schemas: HashMap::from([(schema.schema_id(), Arc::new(schema))]),
+            schemas: HashMap::from([(schema.schema_id(), schema)]),
             partition_specs,
-            default_spec_id: DEFAULT_PARTITION_SPEC_ID,
+            default_spec: BoundPartitionSpecRef::new(unpartition_spec),
             last_partition_id: 0,
             properties,
             current_snapshot_id: None,
@@ -661,8 +654,8 @@ pub(super) mod _serde {
     use crate::spec::schema::_serde::{SchemaV1, SchemaV2};
     use crate::spec::snapshot::_serde::{SnapshotV1, SnapshotV2};
     use crate::spec::{
-        PartitionField, PartitionSpec, Schema, Snapshot, SnapshotReference, 
SnapshotRetention,
-        SortOrder,
+        BoundPartitionSpec, PartitionField, Schema, SchemaRef, 
SchemalessPartitionSpec, Snapshot,
+        SnapshotReference, SnapshotRetention, SortOrder,
     };
     use crate::{Error, ErrorKind};
 
@@ -685,7 +678,7 @@ pub(super) mod _serde {
         pub last_column_id: i32,
         pub schemas: Vec<SchemaV2>,
         pub current_schema_id: i32,
-        pub partition_specs: Vec<PartitionSpec>,
+        pub partition_specs: Vec<SchemalessPartitionSpec>,
         pub default_spec_id: i32,
         pub last_partition_id: i32,
         #[serde(skip_serializing_if = "Option::is_none")]
@@ -721,7 +714,7 @@ pub(super) mod _serde {
         pub current_schema_id: Option<i32>,
         pub partition_spec: Vec<PartitionField>,
         #[serde(skip_serializing_if = "Option::is_none")]
-        pub partition_specs: Option<Vec<PartitionSpec>>,
+        pub partition_specs: Option<Vec<SchemalessPartitionSpec>>,
         #[serde(skip_serializing_if = "Option::is_none")]
         pub default_spec_id: Option<i32>,
         #[serde(skip_serializing_if = "Option::is_none")]
@@ -809,6 +802,44 @@ pub(super) mod _serde {
                     .map(|schema| Ok((schema.schema_id, 
Arc::new(schema.try_into()?))))
                     .collect::<Result<Vec<_>, Error>>()?,
             );
+
+            let current_schema: &SchemaRef =
+                schemas.get(&value.current_schema_id).ok_or_else(|| {
+                    Error::new(
+                        ErrorKind::DataInvalid,
+                        format!(
+                            "No schema exists with the current schema id {}.",
+                            value.current_schema_id
+                        ),
+                    )
+                })?;
+            let partition_specs = HashMap::from_iter(
+                value
+                    .partition_specs
+                    .into_iter()
+                    .map(|x| (x.spec_id(), Arc::new(x))),
+            );
+            let default_spec_id = value.default_spec_id;
+            let default_spec = partition_specs
+                .get(&value.default_spec_id)
+                .map(|schemaless_spec| {
+                    (*schemaless_spec.clone())
+                        .clone()
+                        .bind(current_schema.clone())
+                })
+                .transpose()?
+                .or_else(|| {
+                    (DEFAULT_PARTITION_SPEC_ID == default_spec_id)
+                        .then(|| 
BoundPartitionSpec::unpartition_spec(current_schema.clone()))
+                })
+                .ok_or_else(|| {
+                    Error::new(
+                        ErrorKind::DataInvalid,
+                        format!("Default partition spec {default_spec_id} not 
found"),
+                    )
+                })?
+                .into();
+
             let mut metadata = TableMetadata {
                 format_version: FormatVersion::V2,
                 table_uuid: value.table_uuid,
@@ -818,13 +849,8 @@ pub(super) mod _serde {
                 last_column_id: value.last_column_id,
                 current_schema_id: value.current_schema_id,
                 schemas,
-                partition_specs: HashMap::from_iter(
-                    value
-                        .partition_specs
-                        .into_iter()
-                        .map(|x| (x.spec_id(), Arc::new(x))),
-                ),
-                default_spec_id: value.default_spec_id,
+                partition_specs,
+                default_spec,
                 last_partition_id: value.last_partition_id,
                 properties: value.properties.unwrap_or_default(),
                 current_snapshot_id,
@@ -876,6 +902,7 @@ pub(super) mod _serde {
             } else {
                 value.current_snapshot_id
             };
+
             let schemas = value
                 .schemas
                 .map(|schemas| {
@@ -900,18 +927,49 @@ pub(super) mod _serde {
                 })
                 .transpose()?
                 .unwrap_or_default();
-            let partition_specs = HashMap::from_iter(
-                value
-                    .partition_specs
-                    .unwrap_or_else(|| {
-                        vec![PartitionSpec {
-                            spec_id: DEFAULT_PARTITION_SPEC_ID,
-                            fields: value.partition_spec,
-                        }]
-                    })
-                    .into_iter()
-                    .map(|x| (x.spec_id(), Arc::new(x))),
-            );
+            let current_schema_id = value
+                .current_schema_id
+                .unwrap_or_else(|| 
schemas.keys().copied().max().unwrap_or_default());
+            let current_schema = schemas
+                .get(&current_schema_id)
+                .ok_or_else(|| {
+                    Error::new(
+                        ErrorKind::DataInvalid,
+                        format!(
+                            "No schema exists with the current schema id {}.",
+                            current_schema_id
+                        ),
+                    )
+                })?
+                .clone();
+
+            let partition_specs = match value.partition_specs {
+                Some(partition_specs) => partition_specs,
+                None => 
vec![BoundPartitionSpec::builder(current_schema.clone())
+                    .with_spec_id(DEFAULT_PARTITION_SPEC_ID)
+                    
.add_unbound_fields(value.partition_spec.into_iter().map(|f| f.into_unbound()))?
+                    .build()?
+                    .into_schemaless()],
+            }
+            .into_iter()
+            .map(|x| (x.spec_id(), Arc::new(x)))
+            .collect::<HashMap<_, _>>();
+
+            let default_spec_id = value
+                .default_spec_id
+                .unwrap_or_else(|| 
partition_specs.keys().copied().max().unwrap_or_default());
+            let default_spec = partition_specs
+                .get(&default_spec_id)
+                .map(|x| 
Arc::unwrap_or_clone(x.clone()).bind(current_schema.clone()))
+                .transpose()?
+                .ok_or_else(|| {
+                    Error::new(
+                        ErrorKind::DataInvalid,
+                        format!("Default partition spec {default_spec_id} not 
found"),
+                    )
+                })?
+                .into();
+
             let mut metadata = TableMetadata {
                 format_version: FormatVersion::V1,
                 table_uuid: value.table_uuid.unwrap_or_default(),
@@ -919,12 +977,8 @@ pub(super) mod _serde {
                 last_sequence_number: 0,
                 last_updated_ms: value.last_updated_ms,
                 last_column_id: value.last_column_id,
-                current_schema_id: value
-                    .current_schema_id
-                    .unwrap_or_else(|| 
schemas.keys().copied().max().unwrap_or_default()),
-                default_spec_id: value
-                    .default_spec_id
-                    .unwrap_or_else(|| 
partition_specs.keys().copied().max().unwrap_or_default()),
+                current_schema_id,
+                default_spec,
                 last_partition_id: value
                     .last_partition_id
                     .unwrap_or_else(|| 
partition_specs.keys().copied().max().unwrap_or_default()),
@@ -998,7 +1052,7 @@ pub(super) mod _serde {
                     .into_values()
                     .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| 
s.as_ref().clone()))
                     .collect(),
-                default_spec_id: v.default_spec_id,
+                default_spec_id: v.default_spec.spec_id(),
                 last_partition_id: v.last_partition_id,
                 properties: Some(v.properties),
                 current_snapshot_id: v.current_snapshot_id.or(Some(-1)),
@@ -1067,18 +1121,14 @@ pub(super) mod _serde {
                         .collect(),
                 ),
                 current_schema_id: Some(v.current_schema_id),
-                partition_spec: v
-                    .partition_specs
-                    .get(&v.default_spec_id)
-                    .map(|x| x.fields().to_vec())
-                    .unwrap_or_default(),
+                partition_spec: v.default_spec.fields().to_vec(),
                 partition_specs: Some(
                     v.partition_specs
                         .into_values()
                         .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| 
s.as_ref().clone()))
                         .collect(),
                 ),
-                default_spec_id: Some(v.default_spec_id),
+                default_spec_id: Some(v.default_spec.spec_id()),
                 last_partition_id: Some(v.last_partition_id),
                 properties: if v.properties.is_empty() {
                     None
@@ -1195,9 +1245,9 @@ mod tests {
     use super::{FormatVersion, MetadataLog, SnapshotLog, TableMetadataBuilder};
     use crate::spec::table_metadata::TableMetadata;
     use crate::spec::{
-        NestedField, NullOrder, Operation, PartitionField, PartitionSpec, 
PrimitiveType, Schema,
-        Snapshot, SnapshotReference, SnapshotRetention, SortDirection, 
SortField, SortOrder,
-        Summary, Transform, Type, UnboundPartitionField,
+        BoundPartitionSpec, NestedField, NullOrder, Operation, PrimitiveType, 
Schema, Snapshot,
+        SnapshotReference, SnapshotRetention, SortDirection, SortField, 
SortOrder, Summary,
+        Transform, Type, UnboundPartitionField,
     };
     use crate::TableCreation;
 
@@ -1238,6 +1288,12 @@ mod tests {
                                 "name": "struct_name",
                                 "required": true,
                                 "type": "fixed[1]"
+                            },
+                            {
+                                "id": 4,
+                                "name": "ts",
+                                "required": true,
+                                "type": "timestamp"
                             }
                         ]
                     }
@@ -1279,23 +1335,32 @@ mod tests {
 
         let schema = Schema::builder()
             .with_schema_id(1)
-            .with_fields(vec![Arc::new(NestedField::required(
-                1,
-                "struct_name",
-                Type::Primitive(PrimitiveType::Fixed(1)),
-            ))])
+            .with_fields(vec![
+                Arc::new(NestedField::required(
+                    1,
+                    "struct_name",
+                    Type::Primitive(PrimitiveType::Fixed(1)),
+                )),
+                Arc::new(NestedField::required(
+                    4,
+                    "ts",
+                    Type::Primitive(PrimitiveType::Timestamp),
+                )),
+            ])
             .build()
             .unwrap();
 
-        let partition_spec = PartitionSpec {
-            spec_id: 0,
-            fields: vec![PartitionField {
+        let partition_spec = BoundPartitionSpec::builder(schema.clone())
+            .with_spec_id(0)
+            .add_unbound_field(UnboundPartitionField {
                 name: "ts_day".to_string(),
                 transform: Transform::Day,
                 source_id: 4,
-                field_id: 1000,
-            }],
-        };
+                field_id: Some(1000),
+            })
+            .unwrap()
+            .build()
+            .unwrap();
 
         let expected = TableMetadata {
             format_version: FormatVersion::V2,
@@ -1305,8 +1370,11 @@ mod tests {
             last_column_id: 1,
             schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]),
             current_schema_id: 1,
-            partition_specs: HashMap::from_iter(vec![(0, 
partition_spec.into())]),
-            default_spec_id: 0,
+            partition_specs: HashMap::from_iter(vec![(
+                0,
+                partition_spec.clone().into_schemaless().into(),
+            )]),
+            default_spec: partition_spec.into(),
             last_partition_id: 1000,
             default_sort_order_id: 0,
             sort_orders: HashMap::from_iter(vec![(0, 
SortOrder::unsorted_order().into())]),
@@ -1445,7 +1513,8 @@ mod tests {
             .build()
             .unwrap();
 
-        let partition_spec = PartitionSpec::builder(&schema)
+        let schema = Arc::new(schema);
+        let partition_spec = BoundPartitionSpec::builder(schema.clone())
             .with_spec_id(0)
             .add_partition_field("vendor_id", "vendor_id", Transform::Identity)
             .unwrap()
@@ -1472,10 +1541,10 @@ mod tests {
             location: "/home/iceberg/warehouse/nyc/taxis".to_string(),
             last_updated_ms: 1662532818843,
             last_column_id: 5,
-            schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
+            schemas: HashMap::from_iter(vec![(0, schema)]),
             current_schema_id: 0,
-            partition_specs: HashMap::from_iter(vec![(0, 
partition_spec.into())]),
-            default_spec_id: 0,
+            partition_specs: HashMap::from_iter(vec![(0, 
partition_spec.clone().into_schemaless().into())]),
+            default_spec: Arc::new(partition_spec),
             last_partition_id: 1000,
             default_sort_order_id: 0,
             sort_orders: HashMap::from_iter(vec![(0, sort_order.into())]),
@@ -1514,6 +1583,12 @@ mod tests {
                             "name": "struct_name",
                             "required": true,
                             "type": "fixed[1]"
+                        },
+                        {
+                            "id": 4,
+                            "name": "ts",
+                            "required": true,
+                            "type": "timestamp"
                         }
                     ]
                 }
@@ -1614,6 +1689,12 @@ mod tests {
                             "name": "struct_name",
                             "required": true,
                             "type": "fixed[1]"
+                        },
+                        {
+                            "id": 4,
+                            "name": "ts",
+                            "required": true,
+                            "type": "timestamp"
                         }
                     ]
                 }
@@ -1699,6 +1780,12 @@ mod tests {
                             "name": "struct_name",
                             "required": true,
                             "type": "fixed[1]"
+                        },
+                        {
+                            "id": 4,
+                            "name": "ts",
+                            "required": true,
+                            "type": "timestamp"
                         }
                     ]
                 }
@@ -1828,7 +1915,7 @@ mod tests {
             .build()
             .unwrap();
 
-        let partition_spec = PartitionSpec::builder(&schema1)
+        let partition_spec = BoundPartitionSpec::builder(schema2.clone())
             .with_spec_id(0)
             .add_unbound_field(UnboundPartitionField {
                 name: "x".to_string(),
@@ -1889,8 +1976,11 @@ mod tests {
             last_column_id: 3,
             schemas: HashMap::from_iter(vec![(0, Arc::new(schema1)), (1, 
Arc::new(schema2))]),
             current_schema_id: 1,
-            partition_specs: HashMap::from_iter(vec![(0, 
partition_spec.into())]),
-            default_spec_id: 0,
+            partition_specs: HashMap::from_iter(vec![(
+                0,
+                partition_spec.clone().into_schemaless().into(),
+            )]),
+            default_spec: Arc::new(partition_spec),
             last_partition_id: 1000,
             default_sort_order_id: 3,
             sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
@@ -1951,7 +2041,7 @@ mod tests {
             .build()
             .unwrap();
 
-        let partition_spec = PartitionSpec::builder(&schema)
+        let partition_spec = BoundPartitionSpec::builder(schema.clone())
             .with_spec_id(0)
             .add_unbound_field(UnboundPartitionField {
                 name: "x".to_string(),
@@ -1988,8 +2078,11 @@ mod tests {
             last_column_id: 3,
             schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
             current_schema_id: 0,
-            partition_specs: HashMap::from_iter(vec![(0, 
partition_spec.into())]),
-            default_spec_id: 0,
+            partition_specs: HashMap::from_iter(vec![(
+                0,
+                partition_spec.clone().into_schemaless().into(),
+            )]),
+            default_spec: Arc::new(partition_spec),
             last_partition_id: 1000,
             default_sort_order_id: 3,
             sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]),
@@ -2031,7 +2124,7 @@ mod tests {
             .build()
             .unwrap();
 
-        let partition_spec = PartitionSpec::builder(&schema)
+        let partition_spec = BoundPartitionSpec::builder(schema.clone())
             .with_spec_id(0)
             .add_unbound_field(UnboundPartitionField {
                 name: "x".to_string(),
@@ -2051,8 +2144,11 @@ mod tests {
             last_column_id: 3,
             schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]),
             current_schema_id: 0,
-            partition_specs: HashMap::from_iter(vec![(0, 
partition_spec.into())]),
-            default_spec_id: 0,
+            partition_specs: HashMap::from_iter(vec![(
+                0,
+                partition_spec.clone().into_schemaless().into(),
+            )]),
+            default_spec: Arc::new(partition_spec),
             last_partition_id: 0,
             default_sort_order_id: 0,
             // Sort order is added during deserialization for V2 compatibility
@@ -2165,16 +2261,22 @@ mod tests {
     fn test_default_partition_spec() {
         let default_spec_id = 1234;
         let mut table_meta_data = 
get_test_table_metadata("TableMetadataV2Valid.json");
-        table_meta_data.default_spec_id = default_spec_id;
+        let partition_spec =
+            
BoundPartitionSpec::unpartition_spec(table_meta_data.current_schema().clone());
+        table_meta_data.default_spec = partition_spec.clone().into();
         table_meta_data
             .partition_specs
-            .insert(default_spec_id, Arc::new(PartitionSpec::default()));
+            .insert(default_spec_id, 
Arc::new(partition_spec.into_schemaless()));
 
         assert_eq!(
-            table_meta_data.default_partition_spec(),
-            table_meta_data
+            (*table_meta_data.default_partition_spec().clone())
+                .clone()
+                .into_schemaless(),
+            (*table_meta_data
                 .partition_spec_by_id(default_spec_id)
                 .unwrap()
+                .clone())
+            .clone()
         );
     }
     #[test]
@@ -2225,10 +2327,11 @@ mod tests {
             HashMap::from([(
                 0,
                 Arc::new(
-                    
PartitionSpec::builder(table_metadata.schemas.get(&0).unwrap())
+                    
BoundPartitionSpec::builder(table_metadata.schemas.get(&0).unwrap().clone())
                         .with_spec_id(0)
                         .build()
                         .unwrap()
+                        .into_schemaless()
                 )
             )])
         );
diff --git a/crates/iceberg/src/writer/file_writer/location_generator.rs 
b/crates/iceberg/src/writer/file_writer/location_generator.rs
index 44326190..def18b58 100644
--- a/crates/iceberg/src/writer/file_writer/location_generator.rs
+++ b/crates/iceberg/src/writer/file_writer/location_generator.rs
@@ -132,7 +132,7 @@ pub(crate) mod test {
     use uuid::Uuid;
 
     use super::LocationGenerator;
-    use crate::spec::{FormatVersion, TableMetadata};
+    use crate::spec::{BoundPartitionSpec, FormatVersion, TableMetadata};
     use crate::writer::file_writer::location_generator::{
         FileNameGenerator, WRITE_DATA_LOCATION, WRITE_FOLDER_STORAGE_LOCATION,
     };
@@ -156,6 +156,7 @@ pub(crate) mod test {
 
     #[test]
     fn test_default_location_generate() {
+        let schema = crate::spec::Schema::builder().build().unwrap();
         let mut table_metadata = TableMetadata {
             format_version: FormatVersion::V2,
             table_uuid: 
Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
@@ -165,7 +166,7 @@ pub(crate) mod test {
             schemas: HashMap::new(),
             current_schema_id: 1,
             partition_specs: HashMap::new(),
-            default_spec_id: 1,
+            default_spec: BoundPartitionSpec::unpartition_spec(schema).into(),
             last_partition_id: 1000,
             default_sort_order_id: 0,
             sort_orders: HashMap::from_iter(vec![]),

Reply via email to