This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 038457bd feat(metadata): PartitionSpec builder & validations (#558)
038457bd is described below

commit 038457bd0a73aa5e8add5389777d776e6561675f
Author: Tobias Pütz <tob...@minio.io>
AuthorDate: Wed Sep 17 22:20:07 2025 +0200

    feat(metadata): PartitionSpec builder & validations (#558)
    
    This PR:
    
    1. adds a funct-opts builder for PartitionSpec
    2. implements the remove PartitionSpec Update
    3. ports test relating to PartitionSpecs from iceberg-rust
    4. port missing validations from iceberg-rust to metadataBuilder
    
    
    It's mostly contained within partitions, there's a more fundamental
    change to json deserialization in there which initializes the
    deserialization target with `-1` in all int fields so that we are able
    to determine if we received / did not receive a certain field and are
    consequently able to reject requests where mandatory IDs are missing.
    
    There's currently still some duplication between the funct-opts
    constructor of `PartitionSpec` and `update_spec.go`. I'm still working
    on that one, I hope to arrive at something close to [java's apply() of
    
BaseUpdatePartitionSpec](https://github.com/apache/iceberg/blob/8871bbcf4ffce83be7d1be8d75bf06e5ce7b36e4/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java#L304)
---
 catalog/catalog.go                                 |   9 +
 catalog/rest/rest.go                               |  23 +-
 partitions.go                                      | 226 ++++++++++++++---
 table/arrow_utils_internal_test.go                 |   1 +
 table/internal/parquet_files_test.go               |   1 +
 table/metadata.go                                  | 276 ++++++++++++++++-----
 table/metadata_builder_internal_test.go            | 165 +++++++++++-
 table/metadata_internal_test.go                    |  47 ++++
 table/orphan_cleanup_integration_test.go           |  21 +-
 ...leMetadataV1PartitionSpecsWithoutDefaultId.json |  58 +++++
 .../TableMetadataV2MissingLastPartitionId.json     |  73 ++++++
 .../TableMetadataV2MissingPartitionSpecs.json      |  67 +++++
 table/transaction_test.go                          |   8 +-
 table/update_spec.go                               |  22 +-
 table/update_spec_test.go                          |  15 +-
 table/updates.go                                   |   8 +-
 table/updates_test.go                              |  12 -
 17 files changed, 885 insertions(+), 147 deletions(-)

diff --git a/catalog/catalog.go b/catalog/catalog.go
index 3fec5df9..fb87ead6 100644
--- a/catalog/catalog.go
+++ b/catalog/catalog.go
@@ -76,6 +76,15 @@ type CreateTableCfg struct {
        Properties    iceberg.Properties
 }
 
+func NewCreateTableCfg() CreateTableCfg {
+       return CreateTableCfg{
+               Location:      "",
+               PartitionSpec: nil,
+               SortOrder:     table.UnsortedSortOrder,
+               Properties:    nil,
+       }
+}
+
 // Catalog for iceberg table operations like create, drop, load, list and 
others.
 type Catalog interface {
        // CatalogType returns the type of the catalog.
diff --git a/catalog/rest/rest.go b/catalog/rest/rest.go
index 1c61a7dd..04983d23 100644
--- a/catalog/rest/rest.go
+++ b/catalog/rest/rest.go
@@ -745,32 +745,21 @@ func (r *Catalog) CreateTable(ctx context.Context, 
identifier table.Identifier,
                return nil, err
        }
 
-       var cfg catalog.CreateTableCfg
+       cfg := catalog.NewCreateTableCfg()
        for _, o := range opts {
                o(&cfg)
        }
 
-       freshSchema, err := iceberg.AssignFreshSchemaIDs(schema, nil)
-       if err != nil {
-               return nil, err
-       }
-
-       freshPartitionSpec, err := 
iceberg.AssignFreshPartitionSpecIDs(cfg.PartitionSpec, schema, freshSchema)
-       if err != nil {
-               return nil, err
-       }
-
-       freshSortOrder, err := table.AssignFreshSortOrderIDs(cfg.SortOrder, 
schema, freshSchema)
-       if err != nil {
-               return nil, err
+       if cfg.SortOrder.Fields == nil && cfg.SortOrder.OrderID == 0 {
+               cfg.SortOrder = table.UnsortedSortOrder
        }
 
        payload := createTableRequest{
                Name:          tbl,
-               Schema:        freshSchema,
+               Schema:        schema,
                Location:      cfg.Location,
-               PartitionSpec: &freshPartitionSpec,
-               WriteOrder:    &freshSortOrder,
+               PartitionSpec: cfg.PartitionSpec,
+               WriteOrder:    &cfg.SortOrder,
                StageCreate:   false,
                Props:         cfg.Properties,
        }
diff --git a/partitions.go b/partitions.go
index 03cb3839..db66abd2 100644
--- a/partitions.go
+++ b/partitions.go
@@ -19,6 +19,7 @@ package iceberg
 
 import (
        "encoding/json"
+       "errors"
        "fmt"
        "iter"
        "net/url"
@@ -30,6 +31,7 @@ import (
 const (
        PartitionDataIDStart   = 1000
        InitialPartitionSpecID = 0
+       unassignedFieldID      = 0
 )
 
 // UnpartitionedSpec is the default unpartitioned spec which can
@@ -85,10 +87,194 @@ type PartitionSpec struct {
        sourceIdToFields map[int][]PartitionField
 }
 
+type PartitionOption func(*PartitionSpec) error
+
+// BindToSchema creates a new PartitionSpec by copying the fields from the
+// existing spec verifying compatibility with the schema.
+//
+// If newSpecID is not nil, it will be used as the spec id for the new spec.
+// Otherwise, the existing spec id will be used.
+// If a field in the spec is incompatible with the schema, an error will be
+// returned.
+func (p *PartitionSpec) BindToSchema(schema *Schema, lastPartitionID *int, 
newSpecID *int, isUnbound bool) (PartitionSpec, error) {
+       opts := make([]PartitionOption, 0)
+       if newSpecID != nil {
+               opts = append(opts, WithSpecID(*newSpecID))
+       } else {
+               opts = append(opts, WithSpecID(p.id))
+       }
+
+       for field := range p.Fields() {
+               opts = append(opts, AddPartitionFieldBySourceID(field.SourceID, 
field.Name, field.Transform, schema, &field.FieldID))
+       }
+
+       freshSpec, err := NewPartitionSpecOpts(opts...)
+       if err != nil {
+               return PartitionSpec{}, err
+       }
+       if err = freshSpec.assignPartitionFieldIds(lastPartitionID); err != nil 
{
+               return PartitionSpec{}, err
+       }
+
+       return freshSpec, err
+}
+
+func NewPartitionSpecOpts(opts ...PartitionOption) (PartitionSpec, error) {
+       spec := PartitionSpec{
+               id: 0,
+       }
+       for _, opt := range opts {
+               if err := opt(&spec); err != nil {
+                       return PartitionSpec{}, err
+               }
+       }
+       spec.initialize()
+
+       return spec, nil
+}
+
+func WithSpecID(id int) PartitionOption {
+       return func(p *PartitionSpec) error {
+               p.id = id
+
+               return nil
+       }
+}
+
+func AddPartitionFieldByName(sourceName string, targetName string, transform 
Transform, schema *Schema, fieldID *int) PartitionOption {
+       return func(p *PartitionSpec) error {
+               if schema == nil {
+                       return errors.New("cannot add partition field with nil 
schema")
+               }
+               field, ok := schema.FindFieldByName(sourceName)
+
+               if !ok {
+                       return fmt.Errorf("cannot find source column with name: 
%s in schema", sourceName)
+               }
+               err := p.addSpecFieldInternal(targetName, field, transform, 
fieldID)
+               if err != nil {
+                       return err
+               }
+
+               return nil
+       }
+}
+
+func AddPartitionFieldBySourceID(sourceID int, targetName string, transform 
Transform, schema *Schema, fieldID *int) PartitionOption {
+       return func(p *PartitionSpec) error {
+               if schema == nil {
+                       return errors.New("cannot add partition field with nil 
schema")
+               }
+               field, ok := schema.FindFieldByID(sourceID)
+               if !ok {
+                       return fmt.Errorf("cannot find source column with id: 
%d in schema", sourceID)
+               }
+               err := p.addSpecFieldInternal(targetName, field, transform, 
fieldID)
+               if err != nil {
+                       return err
+               }
+
+               return nil
+       }
+}
+
+func (p *PartitionSpec) addSpecFieldInternal(targetName string, field 
NestedField, transform Transform, fieldID *int) error {
+       if targetName == "" {
+               return errors.New("cannot use empty partition name")
+       }
+       for _, existingField := range p.fields {
+               if existingField.Name == targetName {
+                       return errors.New("duplicate partition name: " + 
targetName)
+               }
+       }
+       var fieldIDValue int
+       if fieldID == nil {
+               fieldIDValue = unassignedFieldID
+       } else {
+               fieldIDValue = *fieldID
+       }
+       if err := p.checkForRedundantPartitions(field.ID, transform); err != 
nil {
+               return err
+       }
+       unboundField := PartitionField{
+               SourceID:  field.ID,
+               FieldID:   fieldIDValue,
+               Name:      targetName,
+               Transform: transform,
+       }
+       p.fields = append(p.fields, unboundField)
+
+       return nil
+}
+
+func (p *PartitionSpec) checkForRedundantPartitions(sourceID int, transform 
Transform) error {
+       if fields, ok := p.sourceIdToFields[sourceID]; ok {
+               for _, f := range fields {
+                       if f.Transform.Equals(transform) {
+                               return fmt.Errorf("cannot add redundant 
partition with source id %d and transform %s. A partition with the same source 
id and transform already exists with name: %s",
+                                       sourceID,
+                                       transform,
+                                       f.Name)
+                       }
+               }
+       }
+
+       return nil
+}
+
+func (p *PartitionSpec) Len() int {
+       return len(p.fields)
+}
+
+func (ps *PartitionSpec) assignPartitionFieldIds(lastAssignedFieldIDPtr *int) 
error {
+       // This is set_field_ids from iceberg-rust
+       // Already assigned partition ids. If we see one of these during 
iteration,
+       // we skip it.
+       assignedIds := make(map[int]struct{})
+       for _, field := range ps.fields {
+               if field.FieldID != unassignedFieldID {
+                       if _, exists := assignedIds[field.FieldID]; exists {
+                               return fmt.Errorf("duplicate field ID provided: 
%d", field.FieldID)
+                       }
+                       assignedIds[field.FieldID] = struct{}{}
+               }
+       }
+
+       lastAssignedFieldID := ps.LastAssignedFieldID()
+       if lastAssignedFieldIDPtr != nil {
+               lastAssignedFieldID = *lastAssignedFieldIDPtr
+       }
+       for i := range ps.fields {
+               if ps.fields[i].FieldID == unassignedFieldID {
+                       // Find the next available ID by incrementing from the 
last known good ID.
+                       lastAssignedFieldID++
+                       for {
+                               if _, exists := 
assignedIds[lastAssignedFieldID]; !exists {
+                                       break // Found an unused ID.
+                               }
+                               lastAssignedFieldID++
+                       }
+
+                       // Assign the new ID and immediately record it as used.
+                       ps.fields[i].FieldID = lastAssignedFieldID
+               } else {
+                       lastAssignedFieldID = max(lastAssignedFieldID, 
ps.fields[i].FieldID)
+               }
+       }
+
+       return nil
+}
+
+// NewPartitionSpec creates a new PartitionSpec with the given fields.
+//
+// The fields are not verified against a schema, use NewPartitionSpecOpts if 
you have to ensure compatibility.
 func NewPartitionSpec(fields ...PartitionField) PartitionSpec {
        return NewPartitionSpecID(InitialPartitionSpecID, fields...)
 }
 
+// NewPartitionSpecID creates a new PartitionSpec with the given fields and id.
+//
+// The fields are not verified against a schema, use NewPartitionSpecOpts if 
you have to ensure compatibility.
 func NewPartitionSpecID(id int, fields ...PartitionField) PartitionSpec {
        ret := PartitionSpec{id: id, fields: fields}
        ret.initialize()
@@ -122,6 +308,10 @@ func (ps PartitionSpec) Equals(other PartitionSpec) bool {
 
 // Fields returns a clone of the partition fields in this spec.
 func (ps *PartitionSpec) Fields() iter.Seq[PartitionField] {
+       if ps.fields == nil {
+               return slices.Values([]PartitionField{})
+       }
+
        return slices.Values(ps.fields)
 }
 
@@ -209,6 +399,11 @@ func (ps *PartitionSpec) LastAssignedFieldID() int {
                }
        }
 
+       if id == unassignedFieldID {
+               // If no fields have been assigned an ID, return the default 
starting ID.
+               return PartitionDataIDStart - 1
+       }
+
        return id
 }
 
@@ -263,37 +458,6 @@ func (ps *PartitionSpec) PartitionToPath(data structLike, 
sc *Schema) string {
        return path.Join(segments...)
 }
 
-// AssignFreshPartitionSpecIDs creates a new PartitionSpec by reassigning the 
field IDs
-// from the old schema to the corresponding fields in the fresh schema, while 
re-assigning
-// the actual Spec IDs to 1000 + the position of the field in the partition 
spec.
-func AssignFreshPartitionSpecIDs(spec *PartitionSpec, old, fresh *Schema) 
(PartitionSpec, error) {
-       if spec == nil {
-               return PartitionSpec{}, nil
-       }
-
-       newFields := make([]PartitionField, 0, len(spec.fields))
-       for pos, field := range spec.fields {
-               origCol, ok := old.FindColumnName(field.SourceID)
-               if !ok {
-                       return PartitionSpec{}, fmt.Errorf("could not find 
field in old schema: %s", field.Name)
-               }
-
-               freshField, ok := fresh.FindFieldByName(origCol)
-               if !ok {
-                       return PartitionSpec{}, fmt.Errorf("could not find 
field in fresh schema: %s", field.Name)
-               }
-
-               newFields = append(newFields, PartitionField{
-                       Name:      field.Name,
-                       SourceID:  freshField.ID,
-                       FieldID:   PartitionDataIDStart + pos,
-                       Transform: field.Transform,
-               })
-       }
-
-       return NewPartitionSpec(newFields...), nil
-}
-
 // GeneratePartitionFieldName returns default partition field name based on 
field transform type
 //
 // The default names are aligned with other client implementations
diff --git a/table/arrow_utils_internal_test.go 
b/table/arrow_utils_internal_test.go
index b39dfa28..7a19d627 100644
--- a/table/arrow_utils_internal_test.go
+++ b/table/arrow_utils_internal_test.go
@@ -44,6 +44,7 @@ func constructTestTable(t *testing.T, writeStats []string) 
(*metadata.FileMetaDa
         "last-column-id": 7,
         "current-schema-id": 0,
                "last-updated-ms": -1,
+               "last-partition-id": 0,
         "schemas": [
             {
                 "type": "struct",
diff --git a/table/internal/parquet_files_test.go 
b/table/internal/parquet_files_test.go
index cde72d8b..ab134c46 100644
--- a/table/internal/parquet_files_test.go
+++ b/table/internal/parquet_files_test.go
@@ -68,6 +68,7 @@ func constructTestTablePrimitiveTypes(t *testing.T) 
(*metadata.FileMetaData, tab
                 ]
             }
         ],
+               "last-partition-id": 0, 
                "last-updated-ms": -1,
         "default-spec-id": 0,
         "partition-specs": [{"spec-id": 0, "fields": []}],
diff --git a/table/metadata.go b/table/metadata.go
index ff3e9a0b..8da21a70 100644
--- a/table/metadata.go
+++ b/table/metadata.go
@@ -163,7 +163,8 @@ type MetadataBuilder struct {
        // >v1 specific
        lastSequenceNumber *int64
        // update tracking
-       lastAddedSchemaID *int
+       lastAddedSchemaID    *int
+       lastAddedPartitionID *int
 }
 
 func NewMetadataBuilder() (*MetadataBuilder, error) {
@@ -284,15 +285,35 @@ func (b *MetadataBuilder) AddSchema(schema 
*iceberg.Schema) (*MetadataBuilder, e
 }
 
 func (b *MetadataBuilder) AddPartitionSpec(spec *iceberg.PartitionSpec, 
initial bool) (*MetadataBuilder, error) {
-       for _, s := range b.specs {
-               if s.ID() == spec.ID() && !initial {
-                       return nil, fmt.Errorf("partition spec with id %d 
already exists", spec.ID())
+       newSpecID := b.reuseOrCreateNewPartitionSpecID(*spec)
+
+       freshSpec, err := spec.BindToSchema(b.CurrentSchema(), 
b.lastPartitionID, &newSpecID, false)
+       if err != nil {
+               return nil, err
+       }
+
+       spec = nil
+       if _, err := b.GetSpecByID(newSpecID); err == nil {
+               if b.lastAddedPartitionID == nil || *b.lastAddedPartitionID != 
newSpecID {
+                       b.updates = append(b.updates, 
NewAddPartitionSpecUpdate(&freshSpec, initial))
+                       b.lastAddedPartitionID = &newSpecID
                }
+
+               return b, nil
        }
 
        maxFieldID := 0
-       for f := range spec.Fields() {
+       fieldCount := 0
+       for f := range freshSpec.Fields() {
                maxFieldID = max(maxFieldID, f.FieldID)
+               if b.formatVersion <= 1 {
+                       expectedID := partitionFieldStartID + fieldCount
+                       if f.FieldID != expectedID {
+                               return nil, fmt.Errorf("v1 constraint: 
partition field IDs are not sequential: expected %d, got %d", expectedID, 
f.FieldID)
+                       }
+                       fieldCount++
+               }
+
        }
 
        prev := partitionFieldStartID - 1
@@ -303,14 +324,15 @@ func (b *MetadataBuilder) AddPartitionSpec(spec 
*iceberg.PartitionSpec, initial
 
        var specs []iceberg.PartitionSpec
        if initial {
-               specs = []iceberg.PartitionSpec{*spec}
+               specs = []iceberg.PartitionSpec{freshSpec}
        } else {
-               specs = append(b.specs, *spec)
+               specs = append(b.specs, freshSpec)
        }
 
        b.specs = specs
        b.lastPartitionID = &lastPartitionID
-       b.updates = append(b.updates, NewAddPartitionSpecUpdate(spec, initial))
+       b.lastAddedPartitionID = &newSpecID
+       b.updates = append(b.updates, NewAddPartitionSpecUpdate(&freshSpec, 
initial))
 
        return b, nil
 }
@@ -439,13 +461,12 @@ func (b *MetadataBuilder) 
SetDefaultSortOrderID(defaultSortOrderID int) (*Metada
 }
 
 func (b *MetadataBuilder) SetDefaultSpecID(defaultSpecID int) 
(*MetadataBuilder, error) {
+       lastUsed := false
        if defaultSpecID == -1 {
-               defaultSpecID = maxBy(b.specs, func(s iceberg.PartitionSpec) 
int {
-                       return s.ID()
-               })
-               if !slices.ContainsFunc(b.updates, func(u Update) bool {
-                       return u.Action() == UpdateAddSpec && 
u.(*addPartitionSpecUpdate).Spec.ID() == defaultSpecID
-               }) {
+               if b.lastAddedPartitionID != nil {
+                       lastUsed = true
+                       defaultSpecID = *b.lastAddedPartitionID
+               } else {
                        return nil, errors.New("can't set default spec to last 
added with no added partition specs")
                }
        }
@@ -458,7 +479,11 @@ func (b *MetadataBuilder) SetDefaultSpecID(defaultSpecID 
int) (*MetadataBuilder,
                return nil, fmt.Errorf("can't set default spec to spec with id 
%d: %w", defaultSpecID, err)
        }
 
-       b.updates = append(b.updates, NewSetDefaultSpecUpdate(defaultSpecID))
+       if lastUsed {
+               b.updates = append(b.updates, NewSetDefaultSpecUpdate(-1))
+       } else {
+               b.updates = append(b.updates, 
NewSetDefaultSpecUpdate(defaultSpecID))
+       }
        b.defaultSpecID = defaultSpecID
 
        return b, nil
@@ -793,6 +818,20 @@ func (b *MetadataBuilder) Build() (Metadata, error) {
        }
 }
 
+func (b *MetadataBuilder) reuseOrCreateNewPartitionSpecID(newSpec 
iceberg.PartitionSpec) int {
+       newSpecID := 0
+       for _, spec := range b.specs {
+               if spec.Equals(newSpec) {
+                       return spec.ID()
+               }
+               if spec.ID() >= newSpecID {
+                       newSpecID = spec.ID() + 1
+               }
+       }
+
+       return newSpecID
+}
+
 func (b *MetadataBuilder) reuseOrCreateNewSchemaID(newSchema *iceberg.Schema) 
int {
        newSchemaID := newSchema.ID
        for _, schema := range b.schemaList {
@@ -807,6 +846,31 @@ func (b *MetadataBuilder) 
reuseOrCreateNewSchemaID(newSchema *iceberg.Schema) in
        return newSchemaID
 }
 
+func (b *MetadataBuilder) RemovePartitionSpecs(ints []int) (*MetadataBuilder, 
error) {
+       if slices.Contains(ints, b.defaultSpecID) {
+               return nil, fmt.Errorf("can't remove default partition spec 
with id %d", b.defaultSpecID)
+       }
+
+       newSpecs := make([]iceberg.PartitionSpec, 0, len(b.specs)-len(ints))
+       removed := make([]int, len(ints))
+       for _, spec := range b.specs {
+               if slices.Contains(ints, spec.ID()) {
+                       removed = append(removed, spec.ID())
+
+                       continue
+               }
+               newSpecs = append(newSpecs, spec)
+       }
+
+       b.specs = newSpecs
+
+       if len(removed) != 0 {
+               b.updates = append(b.updates, NewRemoveSpecUpdate(ints))
+       }
+
+       return b, nil
+}
+
 // maxBy returns the maximum value of extract(e) for all e in elems.
 // If elems is empty, returns 0.
 func maxBy[S ~[]E, E any](elems S, extract func(e E) int) int {
@@ -852,9 +916,9 @@ func ParseMetadataBytes(b []byte) (Metadata, error) {
        var ret Metadata
        switch ver.FormatVersion {
        case 1:
-               ret = &metadataV1{}
+               ret = initMetadataV1Deser()
        case 2:
-               ret = &metadataV2{}
+               ret = initMetadataV2Deser()
        default:
                return nil, ErrInvalidMetadataFormatVersion
        }
@@ -890,6 +954,15 @@ type commonMetadata struct {
        SnapshotRefs       map[string]SnapshotRef  `json:"refs,omitempty"`
 }
 
+func initCommonMetadataForDeserialization() commonMetadata {
+       return commonMetadata{
+               LastUpdatedMS:   -1,
+               LastColumnId:    -1,
+               CurrentSchemaID: -1,
+               DefaultSpecID:   -1,
+       }
+}
+
 func (c *commonMetadata) Ref() SnapshotRef                     { return 
c.SnapshotRefs[MainBranch] }
 func (c *commonMetadata) Refs() iter.Seq2[string, SnapshotRef] { return 
maps.All(c.SnapshotRefs) }
 func (c *commonMetadata) SnapshotLogs() iter.Seq[SnapshotLogEntry] {
@@ -1131,6 +1204,12 @@ func (c *commonMetadata) validate() error {
        case c.LastColumnId < 0:
                // last-column-id is required
                return fmt.Errorf("%w: missing last-column-id", 
ErrInvalidMetadata)
+       case c.CurrentSchemaID < 0:
+               return fmt.Errorf("%w: no valid schema configuration found in 
table metadata", ErrInvalidMetadata)
+       case c.LastPartitionID == nil:
+               if c.FormatVersion > 1 {
+                       return fmt.Errorf("%w: last-partition-id must be set 
for FormatVersion > 1", ErrInvalidMetadata)
+               }
        }
 
        return nil
@@ -1156,6 +1235,12 @@ type metadataV1 struct {
        commonMetadata
 }
 
+func initMetadataV1Deser() *metadataV1 {
+       return &metadataV1{
+               commonMetadata: initCommonMetadataForDeserialization(),
+       }
+}
+
 func (m *metadataV1) LastSequenceNumber() int64 { return 0 }
 
 func (m *metadataV1) Equals(other Metadata) bool {
@@ -1184,6 +1269,10 @@ func (m *metadataV1) preValidate() {
                m.DefaultSpecID = m.Specs[0].ID()
        }
 
+       if m.DefaultSpecID == -1 && len(m.Specs) > 0 {
+               m.DefaultSpecID = maxBy(m.Specs, func(s iceberg.PartitionSpec) 
int { return s.ID() })
+       }
+
        if m.LastPartitionID == nil {
                id := m.Specs[0].LastAssignedFieldID()
                for _, spec := range m.Specs[1:] {
@@ -1213,6 +1302,16 @@ func (m *metadataV1) UnmarshalJSON(b []byte) error {
                return err
        }
 
+       // CurrentSchemaID was optional in v1, it can also be expressed via 
Schema.
+       if aux.CurrentSchemaID == -1 && aux.Schema != nil {
+               aux.CurrentSchemaID = aux.Schema.ID
+               if !slices.ContainsFunc(aux.SchemaList, func(s *iceberg.Schema) 
bool {
+                       return s.Equals(aux.Schema) && s.ID == 
aux.CurrentSchemaID
+               }) {
+                       aux.SchemaList = append(aux.SchemaList, aux.Schema)
+               }
+       }
+
        m.preValidate()
 
        return m.validate()
@@ -1234,6 +1333,13 @@ type metadataV2 struct {
        commonMetadata
 }
 
+func initMetadataV2Deser() *metadataV2 {
+       return &metadataV2{
+               LastSeqNum:     -1,
+               commonMetadata: initCommonMetadataForDeserialization(),
+       }
+}
+
 func (m *metadataV2) LastSequenceNumber() int64 { return m.LastSeqNum }
 
 func (m *metadataV2) Equals(other Metadata) bool {
@@ -1278,63 +1384,117 @@ func NewMetadata(sc *iceberg.Schema, partitions 
*iceberg.PartitionSpec, sortOrde
 
 // NewMetadataWithUUID is like NewMetadata, but allows the caller to specify 
the UUID of the table rather than creating a new one.
 func NewMetadataWithUUID(sc *iceberg.Schema, partitions 
*iceberg.PartitionSpec, sortOrder SortOrder, location string, props 
iceberg.Properties, tableUuid uuid.UUID) (Metadata, error) {
-       freshSchema, err := iceberg.AssignFreshSchemaIDs(sc, nil)
+       if tableUuid == uuid.Nil {
+               tableUuid = uuid.New()
+       }
+       var err error
+       formatVersion := DefaultFormatVersion
+       if props != nil {
+               verStr, ok := props["format-version"]
+               if ok {
+                       if formatVersion, err = strconv.Atoi(verStr); err != 
nil {
+                               formatVersion = DefaultFormatVersion
+                       }
+                       delete(props, "format-version")
+               }
+       }
+
+       reassignedIds, err := reassignIDs(sc, partitions, sortOrder)
        if err != nil {
                return nil, err
        }
 
-       freshPartitions, err := iceberg.AssignFreshPartitionSpecIDs(partitions, 
sc, freshSchema)
+       builder, err := NewMetadataBuilder()
        if err != nil {
                return nil, err
        }
 
-       freshSortOrder, err := AssignFreshSortOrderIDs(sortOrder, sc, 
freshSchema)
+       _, err = builder.SetFormatVersion(formatVersion)
        if err != nil {
                return nil, err
        }
 
-       if tableUuid == uuid.Nil {
-               tableUuid = uuid.New()
+       _, err = builder.SetUUID(tableUuid)
+       if err != nil {
+               return nil, err
        }
 
-       formatVersion := DefaultFormatVersion
-       if props != nil {
-               verStr, ok := props["format-version"]
-               if ok {
-                       if formatVersion, err = strconv.Atoi(verStr); err != 
nil {
-                               formatVersion = DefaultFormatVersion
-                       }
-                       delete(props, "format-version")
+       _, err = builder.AddSortOrder(&reassignedIds.sortOrder, true)
+       if err != nil {
+               return nil, err
+       }
+
+       _, err = builder.SetDefaultSortOrderID(-1)
+       if err != nil {
+               return nil, err
+       }
+
+       _, err = builder.AddSchema(reassignedIds.schema)
+       if err != nil {
+               return nil, err
+       }
+
+       _, err = builder.SetCurrentSchemaID(-1)
+       if err != nil {
+               return nil, err
+       }
+
+       _, err = builder.AddPartitionSpec(reassignedIds.partitionSpec, true)
+       if err != nil {
+               return nil, err
+       }
+
+       _, err = builder.SetLoc(location)
+       if err != nil {
+               return nil, err
+       }
+
+       _, err = builder.SetProperties(props)
+       if err != nil {
+               return nil, err
+       }
+
+       return builder.Build()
+}
+
+type ReassignedIds struct {
+       schema        *iceberg.Schema
+       partitionSpec *iceberg.PartitionSpec
+       sortOrder     SortOrder
+}
+
+func reassignIDs(sc *iceberg.Schema, partitions *iceberg.PartitionSpec, 
sortOrder SortOrder) (*ReassignedIds, error) {
+       previousMapFn := sc.FindColumnName
+       freshSc, err := iceberg.AssignFreshSchemaIDs(sc, nil)
+       if err != nil {
+               return nil, err
+       }
+
+       if partitions == nil {
+               partitions = iceberg.UnpartitionedSpec
+       }
+       opts := make([]iceberg.PartitionOption, 0)
+       for f := range partitions.Fields() {
+               var s string
+               var ok bool
+               if s, ok = previousMapFn(f.SourceID); !ok {
+                       return nil, fmt.Errorf("field %d not found in schema", 
f.FieldID)
                }
+               opts = append(opts, iceberg.AddPartitionFieldByName(s, f.Name, 
f.Transform, freshSc, nil))
+       }
+       freshPartitions, err := iceberg.NewPartitionSpecOpts(opts...)
+       if err != nil {
+               return nil, err
        }
 
-       lastPartitionID := freshPartitions.LastAssignedFieldID()
-       common := commonMetadata{
-               LastUpdatedMS:      time.Now().UnixMilli(),
-               LastColumnId:       freshSchema.HighestFieldID(),
-               FormatVersion:      formatVersion,
-               UUID:               tableUuid,
-               Loc:                location,
-               SchemaList:         []*iceberg.Schema{freshSchema},
-               CurrentSchemaID:    freshSchema.ID,
-               Specs:              []iceberg.PartitionSpec{freshPartitions},
-               DefaultSpecID:      freshPartitions.ID(),
-               LastPartitionID:    &lastPartitionID,
-               Props:              props,
-               SortOrderList:      []SortOrder{freshSortOrder},
-               DefaultSortOrderID: freshSortOrder.OrderID,
-       }
-
-       switch formatVersion {
-       case 1:
-               return &metadataV1{
-                       commonMetadata: common,
-                       Schema:         freshSchema,
-                       Partition:      
slices.Collect(freshPartitions.Fields()),
-               }, nil
-       case 2:
-               return &metadataV2{commonMetadata: common}, nil
-       default:
-               return nil, fmt.Errorf("invalid format version: %d", 
formatVersion)
+       freshSortOrder, err := AssignFreshSortOrderIDs(sortOrder, sc, freshSc)
+       if err != nil {
+               return nil, err
        }
+
+       return &ReassignedIds{
+               schema:        freshSc,
+               partitionSpec: &freshPartitions,
+               sortOrder:     freshSortOrder,
+       }, nil
 }
diff --git a/table/metadata_builder_internal_test.go 
b/table/metadata_builder_internal_test.go
index 229eab04..1fc900e4 100644
--- a/table/metadata_builder_internal_test.go
+++ b/table/metadata_builder_internal_test.go
@@ -18,9 +18,11 @@
 package table
 
 import (
+       "fmt"
        "testing"
 
        "github.com/apache/iceberg-go"
+       "github.com/davecgh/go-spew/spew"
        "github.com/stretchr/testify/require"
 )
 
@@ -74,14 +76,19 @@ func builderWithoutChanges(formatVersion int) 
MetadataBuilder {
        if err != nil {
                panic(err)
        }
-       _, err = builder.AddPartitionSpec(&partitionSpec, true)
+       _, err = builder.AddSchema(&tableSchema)
        if err != nil {
                panic(err)
        }
-       _, err = builder.AddSchema(&tableSchema)
+       _, err = builder.SetCurrentSchemaID(-1)
        if err != nil {
                panic(err)
        }
+       _, err = builder.AddPartitionSpec(&partitionSpec, true)
+       if err != nil {
+               panic(err)
+       }
+
        meta, err := builder.Build()
        if err != nil {
                panic(err)
@@ -94,6 +101,138 @@ func builderWithoutChanges(formatVersion int) 
MetadataBuilder {
        return *builder
 }
 
+func TestAddRemovePartitionSpec(t *testing.T) {
+       builder := builderWithoutChanges(2)
+       builderRef := &builder
+       i := 1000
+       addedSpec, err := iceberg.NewPartitionSpecOpts(
+               iceberg.WithSpecID(10),
+               iceberg.AddPartitionFieldBySourceID(2, "y", 
iceberg.IdentityTransform{}, builder.schemaList[0], &i),
+               iceberg.AddPartitionFieldBySourceID(3, "z", 
iceberg.IdentityTransform{}, builder.schemaList[0], nil))
+       require.NoError(t, err)
+
+       builderRef, err = builderRef.AddPartitionSpec(&addedSpec, false)
+       require.NoError(t, err)
+       metadata, err := builderRef.Build()
+       require.NoError(t, err)
+       require.NotNil(t, metadata)
+
+       i2 := 1001
+       expectedSpec, err := iceberg.NewPartitionSpecOpts(
+               iceberg.WithSpecID(1),
+               iceberg.AddPartitionFieldBySourceID(2, "y", 
iceberg.IdentityTransform{}, builder.schemaList[0], &i),
+               iceberg.AddPartitionFieldBySourceID(3, "z", 
iceberg.IdentityTransform{}, builder.schemaList[0], &i2))
+       require.NoError(t, err)
+       require.Equal(t, metadata.DefaultPartitionSpec(), 0)
+       require.Equal(t, *metadata.LastPartitionSpecID(), i2)
+       found := false
+       for _, part := range metadata.PartitionSpecs() {
+               if part.ID() == 1 {
+                       found = true
+                       require.True(t, part.Equals(expectedSpec), "expected 
partition spec to match added spec")
+               }
+       }
+       require.True(t, found, "expected partition spec to be added")
+
+       newBuilder, err := MetadataBuilderFromBase(metadata)
+       require.NoError(t, err)
+       // Remove the spec
+       newBuilderRef, err := newBuilder.RemovePartitionSpecs([]int{1})
+       require.NoError(t, err)
+       newBuild, err := newBuilderRef.Build()
+       require.NoError(t, err)
+       require.NotNil(t, newBuild)
+       require.Len(t, newBuilder.updates, 1)
+       require.Len(t, newBuild.PartitionSpecs(), 1)
+       _, err = newBuilder.GetSpecByID(1)
+       require.ErrorContains(t, err, "partition spec with id 1 not found")
+}
+
+func TestSetDefaultPartitionSpec(t *testing.T) {
+       builder := builderWithoutChanges(2)
+       curSchema, err := builder.GetSchemaByID(builder.currentSchemaID)
+       require.NoError(t, err)
+       // Add a partition spec
+       addedSpec, err := iceberg.NewPartitionSpecOpts(
+               iceberg.WithSpecID(10),
+               iceberg.AddPartitionFieldBySourceID(1, "y_bucket[2]", 
iceberg.BucketTransform{NumBuckets: 2}, curSchema, nil))
+       require.NoError(t, err)
+       builderRef, err := builder.AddPartitionSpec(&addedSpec, false)
+       require.NoError(t, err)
+       // Set the default partition spec
+       builderRef, err = builderRef.SetDefaultSpecID(-1)
+       require.NoError(t, err)
+
+       id := 1001
+       expectedSpec, err := iceberg.NewPartitionSpecOpts(
+               iceberg.WithSpecID(1),
+               iceberg.AddPartitionFieldBySourceID(1, "y_bucket[2]", 
iceberg.BucketTransform{NumBuckets: 2}, curSchema, &id))
+       require.NoError(t, err)
+       require.True(t, builderRef.HasChanges())
+       require.Equal(t, len(builderRef.updates), 2)
+       require.True(t, 
builderRef.updates[0].(*addPartitionSpecUpdate).Spec.Equals(expectedSpec))
+       require.Equal(t, -1, 
builderRef.updates[1].(*setDefaultSpecUpdate).SpecID)
+       metadata, err := builderRef.Build()
+       require.NoError(t, err)
+       require.NotNil(t, metadata)
+
+       require.Equal(t, metadata.DefaultPartitionSpec(), 1)
+       require.True(t, expectedSpec.Equals(metadata.PartitionSpec()), 
fmt.Sprintf("expected partition spec to match added spec %s, %s", 
spew.Sdump(expectedSpec), spew.Sdump(metadata.PartitionSpec())))
+       require.Equal(t, *metadata.LastPartitionSpecID(), 1001)
+}
+
+func TestSetExistingDefaultPartitionSpec(t *testing.T) {
+       builder := builderWithoutChanges(2)
+       curSchema, err := builder.GetSchemaByID(builder.currentSchemaID)
+       require.NoError(t, err)
+
+       addedSpec, err := iceberg.NewPartitionSpecOpts(
+               iceberg.WithSpecID(10),
+               iceberg.AddPartitionFieldBySourceID(1, "y_bucket[2]", 
iceberg.BucketTransform{NumBuckets: 2}, curSchema, nil))
+       require.NoError(t, err)
+
+       id := 1001
+       expectedSpec, err := 
iceberg.NewPartitionSpecOpts(iceberg.WithSpecID(1), 
iceberg.AddPartitionFieldBySourceID(1, "y_bucket[2]", 
iceberg.BucketTransform{NumBuckets: 2}, curSchema, &id))
+       require.NoError(t, err)
+
+       builderRef, err := builder.AddPartitionSpec(&addedSpec, false)
+       require.NoError(t, err)
+
+       builderRef, err = builderRef.SetDefaultSpecID(-1)
+       require.NoError(t, err)
+
+       require.True(t, builderRef.HasChanges())
+       require.Len(t, builderRef.updates, 2)
+       require.True(t, 
builderRef.updates[0].(*addPartitionSpecUpdate).Spec.Equals(expectedSpec))
+       require.Equal(t, -1, 
builderRef.updates[1].(*setDefaultSpecUpdate).SpecID)
+
+       metadata, err := builderRef.Build()
+       require.NoError(t, err)
+       require.NotNil(t, metadata)
+       require.Equal(t, 1, metadata.DefaultPartitionSpec())
+
+       require.True(t, expectedSpec.Equals(metadata.PartitionSpec()), 
"expected partition spec to match added spec")
+
+       newBuilder, err := MetadataBuilderFromBase(metadata)
+       require.NoError(t, err)
+       require.NotNil(t, newBuilder)
+
+       newBuilderRef, err := newBuilder.SetDefaultSpecID(0)
+       require.NoError(t, err)
+
+       require.True(t, newBuilderRef.HasChanges(), "expected changes after 
setting default spec")
+       require.Len(t, newBuilderRef.updates, 1, "expected one update")
+       require.Equal(t, 0, 
newBuilderRef.updates[0].(*setDefaultSpecUpdate).SpecID, "expected default 
partition spec to be set to 0")
+
+       newBuild, err := newBuilderRef.Build()
+       require.NoError(t, err)
+       require.NotNil(t, newBuild)
+       require.Equal(t, 0, newBuild.DefaultPartitionSpec(), "expected default 
partition spec to be set to 0")
+
+       newWithoutChanges := builderWithoutChanges(2)
+       require.True(t, 
newWithoutChanges.specs[0].Equals(newBuild.PartitionSpec()), "expected 
partition spec to match added spec")
+}
+
 func TestSetRef(t *testing.T) {
        builder := builderWithoutChanges(2)
        schemaID := 0
@@ -129,6 +268,21 @@ func TestSetRef(t *testing.T) {
        require.Len(t, builder.snapshotLog, 1)
 }
 
+func TestAddPartitionSpecForV1RequiresSequentialIDs(t *testing.T) {
+       builder := builderWithoutChanges(1)
+
+       // Add a partition spec with non-sequential IDs
+       id := 1000
+       id2 := 1002
+       addedSpec, err := iceberg.NewPartitionSpecOpts(iceberg.WithSpecID(10),
+               iceberg.AddPartitionFieldBySourceID(2, "y", 
iceberg.IdentityTransform{}, builder.CurrentSchema(), &id),
+               iceberg.AddPartitionFieldBySourceID(3, "z", 
iceberg.IdentityTransform{}, builder.CurrentSchema(), &id2))
+       require.NoError(t, err)
+
+       _, err = builder.AddPartitionSpec(&addedSpec, false)
+       require.ErrorContains(t, err, "v1 constraint: partition field IDs are 
not sequential: expected 1001, got 1002")
+}
+
 func TestSetBranchSnapshotCreatesBranchIfNotExists(t *testing.T) {
        builder := builderWithoutChanges(2)
        schemaID := 0
@@ -213,3 +367,10 @@ func TestRemoveMainSnapshotRef(t *testing.T) {
        require.NoError(t, err)
        require.NotNil(t, meta)
 }
+
+func TestDefaultSpecCannotBeRemoved(t *testing.T) {
+       builder := builderWithoutChanges(2)
+
+       _, err := builder.RemovePartitionSpecs([]int{0})
+       require.ErrorContains(t, err, "can't remove default partition spec with 
id 0")
+}
diff --git a/table/metadata_internal_test.go b/table/metadata_internal_test.go
index e183fcf3..ef1fc6ac 100644
--- a/table/metadata_internal_test.go
+++ b/table/metadata_internal_test.go
@@ -902,6 +902,7 @@ func TestMetadataV2Validation(t *testing.T) {
                "last-sequence-number": 34,
                "current-schema-id": 0,
                "last-updated-ms": 1602638573590,
+               "last-partition-id": 1000,
                "schemas": [{"type":"struct","schema-id":0,"fields":[]}],
                "default-spec-id": 0,
                "partition-specs": [{"spec-id": 0, "fields": []}],
@@ -917,6 +918,7 @@ func TestMetadataV2Validation(t *testing.T) {
                "last-updated-ms": 1602638573874,
                "last-column-id": 5,
                "current-schema-id": 0,
+               "last-partition-id": 1000,
                "schemas": [{"type":"struct","schema-id":0,"fields":[]}],
                "partition-specs": [{"spec-id": 0, "fields": []}],
                "properties": {},
@@ -933,6 +935,7 @@ func TestMetadataV2Validation(t *testing.T) {
                "current-schema-id": 0,
                "last-updated-ms": 1602638573590,
                "last-column-id": 0,
+               "last-partition-id": 1000,
                "schemas": [{"type":"struct","schema-id":0,"fields":[]}],
                "partition-specs": [{"spec-id": 0, "fields": []}],
                "sort-orders": [],
@@ -952,6 +955,50 @@ func TestMetadataV2Validation(t *testing.T) {
        require.NoError(t, meta3.UnmarshalJSON([]byte(zeroColumnID)))
 }
 
+func TestTableMetadataV1PartitionSpecsWithoutDefaultId(t *testing.T) {
+       // Deserialize the JSON - this should succeed by inferring 
default_spec_id as the max spec ID
+       meta, err := 
getTestTableMetadata("TableMetadataV1PartitionSpecsWithoutDefaultId.json")
+       require.NoError(t, err)
+       require.Equal(t, meta.Version(), 1)
+       require.Equal(t, meta.TableUUID(), 
uuid.MustParse("d20125c8-7284-442c-9aea-15fee620737c"))
+       require.Equal(t, meta.DefaultPartitionSpec(), 2)
+       require.Equal(t, len(meta.PartitionSpecs()), 2)
+       spec := meta.PartitionSpec()
+       require.Equal(t, spec.ID(), 2)
+       require.Equal(t, spec.NumFields(), 1)
+       require.Equal(t, spec.Field(0).Name, "y")
+       require.Equal(t, spec.Field(0).Transform, iceberg.IdentityTransform{})
+       require.Equal(t, spec.Field(0).SourceID, 2)
+}
+
+func TestTableMetadataV2MissingPartitionSpecs(t *testing.T) {
+       meta, err := 
getTestTableMetadata("TableMetadataV2MissingPartitionSpecs.json")
+       require.Error(t, err)
+       require.Nil(t, meta)
+       // TODO: check for specific error
+}
+
+func TestTableMetadataV2MissingLastPartitionId(t *testing.T) {
+       // Similarly to above, this should fail but isn't since Go's lack of an 
Option type means it will just put a 0 for
+       // the missing lastPartitionId.
+       meta, err := 
getTestTableMetadata("TableMetadataV2MissingLastPartitionId.json")
+       require.Error(t, err)
+       require.Nil(t, meta)
+       // TODO: check for specific error
+}
+
+func TestDefaultPartitionSpec(t *testing.T) {
+       defaultSpecID := 1234
+       meta, err := getTestTableMetadata("TableMetadataV2Valid.json")
+       require.NoError(t, err)
+       spec := iceberg.NewPartitionSpecID(1234)
+
+       meta.(*metadataV2).DefaultSpecID = spec.ID()
+       meta.(*metadataV2).Specs = append(meta.(*metadataV2).Specs, spec)
+       partitionSpec := meta.PartitionSpec()
+       require.Equal(t, partitionSpec.ID(), defaultSpecID)
+}
+
 func getTestTableMetadata(fileName string) (Metadata, error) {
        fCont, err := os.ReadFile(path.Join("testdata", fileName))
        if err != nil {
diff --git a/table/orphan_cleanup_integration_test.go 
b/table/orphan_cleanup_integration_test.go
index fef31c9d..bc4d3883 100644
--- a/table/orphan_cleanup_integration_test.go
+++ b/table/orphan_cleanup_integration_test.go
@@ -53,16 +53,17 @@ const (
        OrphanFilePrefix  = "orphan_"
 )
 
-var (
-       tableSchemaSimple = iceberg.NewSchemaWithIdentifiers(1,
-               []int{1},
-               iceberg.NestedField{
-                       ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int64, 
Required: true},
-               iceberg.NestedField{
-                       ID: 2, Name: "name", Type: 
iceberg.PrimitiveTypes.String, Required: true},
-               iceberg.NestedField{
-                       ID: 3, Name: "value", Type: 
iceberg.PrimitiveTypes.Float64, Required: false},
-       )
+var tableSchemaSimple = iceberg.NewSchemaWithIdentifiers(1,
+       []int{1},
+       iceberg.NestedField{
+               ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int64, 
Required: true,
+       },
+       iceberg.NestedField{
+               ID: 2, Name: "name", Type: iceberg.PrimitiveTypes.String, 
Required: true,
+       },
+       iceberg.NestedField{
+               ID: 3, Name: "value", Type: iceberg.PrimitiveTypes.Float64, 
Required: false,
+       },
 )
 
 func (s *OrphanCleanupIntegrationSuite) loadCatalog() *rest.Catalog {
diff --git a/table/testdata/TableMetadataV1PartitionSpecsWithoutDefaultId.json 
b/table/testdata/TableMetadataV1PartitionSpecsWithoutDefaultId.json
new file mode 100644
index 00000000..6f4ed6a9
--- /dev/null
+++ b/table/testdata/TableMetadataV1PartitionSpecsWithoutDefaultId.json
@@ -0,0 +1,58 @@
+{
+  "format-version": 1,
+  "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c",
+  "location": "s3://bucket/test/location",
+  "last-updated-ms": 1602638573874,
+  "last-column-id": 3,
+  "schema": {
+    "type": "struct",
+    "fields": [
+      {
+        "id": 1,
+        "name": "x",
+        "required": true,
+        "type": "long"
+      },
+      {
+        "id": 2,
+        "name": "y",
+        "required": true,
+        "type": "long",
+        "doc": "comment"
+      },
+      {
+        "id": 3,
+        "name": "z",
+        "required": true,
+        "type": "long"
+      }
+    ]
+  },
+  "partition-specs": [
+    {
+      "spec-id": 1,
+      "fields": [
+        {
+          "name": "x",
+          "transform": "identity",
+          "source-id": 1,
+          "field-id": 1000
+        }
+      ]
+    },
+    {
+      "spec-id": 2,
+      "fields": [
+        {
+          "name": "y",
+          "transform": "identity",
+          "source-id": 2,
+          "field-id": 1001
+        }
+      ]
+    }
+  ],
+  "properties": {},
+  "current-snapshot-id": -1,
+  "snapshots": []
+}
\ No newline at end of file
diff --git a/table/testdata/TableMetadataV2MissingLastPartitionId.json 
b/table/testdata/TableMetadataV2MissingLastPartitionId.json
new file mode 100644
index 00000000..31c2b4ca
--- /dev/null
+++ b/table/testdata/TableMetadataV2MissingLastPartitionId.json
@@ -0,0 +1,73 @@
+{
+  "format-version": 2,
+  "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
+  "location": "s3://bucket/test/location",
+  "last-sequence-number": 34,
+  "last-updated-ms": 1602638573590,
+  "last-column-id": 3,
+  "current-schema-id": 0,
+  "schemas": [{
+    "type": "struct",
+    "schema-id": 0,
+    "fields": [
+      {
+        "id": 1,
+        "name": "x",
+        "required": true,
+        "type": "long"
+      },
+      {
+        "id": 2,
+        "name": "y",
+        "required": true,
+        "type": "long",
+        "doc": "comment"
+      },
+      {
+        "id": 3,
+        "name": "z",
+        "required": true,
+        "type": "long"
+      }
+    ]
+  }],
+  "default-spec-id": 0,
+  "partition-specs": [
+    {
+      "spec-id": 0,
+      "fields": [
+        {
+          "name": "x",
+          "transform": "identity",
+          "source-id": 1,
+          "field-id": 1000
+        }
+      ]
+    }
+  ],
+  "default-sort-order-id": 3,
+  "sort-orders": [
+    {
+      "order-id": 3,
+      "fields": [
+        {
+          "transform": "identity",
+          "source-id": 2,
+          "direction": "asc",
+          "null-order": "nulls-first"
+        },
+        {
+          "transform": "bucket[4]",
+          "source-id": 3,
+          "direction": "desc",
+          "null-order": "nulls-last"
+        }
+      ]
+    }
+  ],
+  "properties": {},
+  "current-snapshot-id": -1,
+  "snapshots": [],
+  "snapshot-log": [],
+  "metadata-log": []
+}
\ No newline at end of file
diff --git a/table/testdata/TableMetadataV2MissingPartitionSpecs.json 
b/table/testdata/TableMetadataV2MissingPartitionSpecs.json
new file mode 100644
index 00000000..3ab0a7a1
--- /dev/null
+++ b/table/testdata/TableMetadataV2MissingPartitionSpecs.json
@@ -0,0 +1,67 @@
+{
+  "format-version": 2,
+  "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
+  "location": "s3://bucket/test/location",
+  "last-sequence-number": 34,
+  "last-updated-ms": 1602638573590,
+  "last-column-id": 3,
+  "current-schema-id": 0,
+  "schemas": [{
+    "type": "struct",
+    "schema-id": 0,
+    "fields": [
+      {
+        "id": 1,
+        "name": "x",
+        "required": true,
+        "type": "long"
+      },
+      {
+        "id": 2,
+        "name": "y",
+        "required": true,
+        "type": "long",
+        "doc": "comment"
+      },
+      {
+        "id": 3,
+        "name": "z",
+        "required": true,
+        "type": "long"
+      }
+    ]
+  }],
+  "partition-spec": [
+    {
+      "name": "x",
+      "transform": "identity",
+      "source-id": 1,
+      "field-id": 1000
+    }
+  ],
+  "default-sort-order-id": 3,
+  "sort-orders": [
+    {
+      "order-id": 3,
+      "fields": [
+        {
+          "transform": "identity",
+          "source-id": 2,
+          "direction": "asc",
+          "null-order": "nulls-first"
+        },
+        {
+          "transform": "bucket[4]",
+          "source-id": 3,
+          "direction": "desc",
+          "null-order": "nulls-last"
+        }
+      ]
+    }
+  ],
+  "properties": {},
+  "current-snapshot-id": -1,
+  "snapshots": [],
+  "snapshot-log": [],
+  "metadata-log": []
+}
\ No newline at end of file
diff --git a/table/transaction_test.go b/table/transaction_test.go
index db11af9d..a0269cb3 100644
--- a/table/transaction_test.go
+++ b/table/transaction_test.go
@@ -189,9 +189,11 @@ func (s *SparkIntegrationTestSuite) 
TestDifferentDataTypes() {
                iceberg.NestedField{ID: 14, Name: "small_dec", Type: 
iceberg.DecimalTypeOf(8, 2)},
                iceberg.NestedField{ID: 15, Name: "med_dec", Type: 
iceberg.DecimalTypeOf(16, 2)},
                iceberg.NestedField{ID: 16, Name: "large_dec", Type: 
iceberg.DecimalTypeOf(24, 2)},
-               iceberg.NestedField{ID: 17, Name: "list", Type: 
&iceberg.ListType{
-                       ElementID: 18,
-                       Element:   iceberg.PrimitiveTypes.Int32},
+               iceberg.NestedField{
+                       ID: 17, Name: "list", Type: &iceberg.ListType{
+                               ElementID: 18,
+                               Element:   iceberg.PrimitiveTypes.Int32,
+                       },
                },
        )
 
diff --git a/table/update_spec.go b/table/update_spec.go
index df777d63..9447761d 100644
--- a/table/update_spec.go
+++ b/table/update_spec.go
@@ -115,7 +115,10 @@ func (us *UpdateSpec) BuildUpdates() ([]Update, 
[]Requirement, error) {
                }
        }
 
-       newSpec := us.Apply()
+       newSpec, err := us.Apply()
+       if err != nil {
+               return nil, nil, err
+       }
        updates := make([]Update, 0)
        requirements := make([]Requirement, 0)
 
@@ -133,7 +136,7 @@ func (us *UpdateSpec) BuildUpdates() ([]Update, 
[]Requirement, error) {
        return updates, requirements, nil
 }
 
-func (us *UpdateSpec) Apply() iceberg.PartitionSpec {
+func (us *UpdateSpec) Apply() (iceberg.PartitionSpec, error) {
        partitionFields := make([]iceberg.PartitionField, 0)
        partitionNames := make(map[string]bool)
        spec := us.txn.tbl.Metadata().PartitionSpec()
@@ -147,7 +150,7 @@ func (us *UpdateSpec) Apply() iceberg.PartitionSpec {
                                newField, err = 
us.addNewField(us.txn.tbl.Schema(), field.SourceID, field.FieldID, field.Name, 
field.Transform, partitionNames)
                        }
                        if err != nil {
-                               return iceberg.PartitionSpec{}
+                               return iceberg.PartitionSpec{}, err
                        }
                        partitionFields = append(partitionFields, newField)
                } else if us.txn.tbl.Metadata().Version() == 1 {
@@ -157,15 +160,22 @@ func (us *UpdateSpec) Apply() iceberg.PartitionSpec {
                                newField, err = 
us.addNewField(us.txn.tbl.Schema(), field.SourceID, field.FieldID, field.Name, 
iceberg.VoidTransform{}, partitionNames)
                        }
                        if err != nil {
-                               return iceberg.PartitionSpec{}
+                               return iceberg.PartitionSpec{}, err
                        }
                        partitionFields = append(partitionFields, newField)
                }
        }
 
        partitionFields = append(partitionFields, us.adds...)
+       opts := make([]iceberg.PartitionOption, len(partitionFields))
+       for i, field := range partitionFields {
+               opts[i] = iceberg.AddPartitionFieldBySourceID(field.SourceID, 
field.Name, field.Transform, us.txn.tbl.Schema(), &field.FieldID)
+       }
 
-       newSpec := iceberg.NewPartitionSpec(partitionFields...)
+       newSpec, err := iceberg.NewPartitionSpecOpts(opts...)
+       if err != nil {
+               return iceberg.PartitionSpec{}, err
+       }
        newSpecId := iceberg.InitialPartitionSpecID
        for _, spec = range us.txn.tbl.Metadata().PartitionSpecs() {
                if newSpec.CompatibleWith(&spec) {
@@ -177,7 +187,7 @@ func (us *UpdateSpec) Apply() iceberg.PartitionSpec {
                }
        }
 
-       return iceberg.NewPartitionSpecID(newSpecId, partitionFields...)
+       return iceberg.NewPartitionSpecID(newSpecId, partitionFields...), nil
 }
 
 func (us *UpdateSpec) Commit() error {
diff --git a/table/update_spec_test.go b/table/update_spec_test.go
index 0fc102aa..ea113111 100644
--- a/table/update_spec_test.go
+++ b/table/update_spec_test.go
@@ -75,7 +75,8 @@ func TestUpdateSpecAddField(t *testing.T) {
                assert.NotNil(t, updates)
                assert.NotNil(t, reqs)
 
-               newSpec := specUpdate.Apply()
+               newSpec, err := specUpdate.Apply()
+               assert.NoError(t, err)
                assert.NotNil(t, newSpec)
                assert.Equal(t, 1, newSpec.ID())
                assert.Equal(t, 1003, newSpec.LastAssignedFieldID())
@@ -198,7 +199,8 @@ func TestUpdateSpecAddField(t *testing.T) {
                assert.NotNil(t, updates)
                assert.NotNil(t, reqs)
 
-               newSpec := specUpdate.Apply()
+               newSpec, err := specUpdate.Apply()
+               assert.NoError(t, err)
                assert.NotNil(t, newSpec)
                assert.Equal(t, "street_void_1001", 
newSpec.FieldsBySourceID(5)[0].Name)
        })
@@ -218,7 +220,8 @@ func TestUpdateSpecAddIdentityField(t *testing.T) {
                assert.NotNil(t, updates)
                assert.NotNil(t, reqs)
 
-               newSpec := specUpdate.Apply()
+               newSpec, err := specUpdate.Apply()
+               assert.NoError(t, err)
                assert.NotNil(t, newSpec)
                assert.Equal(t, 1, newSpec.ID())
                assert.Equal(t, 1003, newSpec.LastAssignedFieldID())
@@ -254,7 +257,8 @@ func TestUpdateSpecRenameField(t *testing.T) {
                assert.NotNil(t, updates)
                assert.NotNil(t, reqs)
 
-               newSpec := updateSpec.Apply()
+               newSpec, err := updateSpec.Apply()
+               assert.NoError(t, err)
                assert.NotNil(t, newSpec)
                assert.Equal(t, 1, newSpec.ID())
                assert.Equal(t, "new_id_identity", 
newSpec.FieldsBySourceID(1)[0].Name)
@@ -318,7 +322,8 @@ func TestUpdateSpecRemoveField(t *testing.T) {
                assert.NotNil(t, updates)
                assert.NotNil(t, reqs)
 
-               newSpec := updateSpec.Apply()
+               newSpec, err := updateSpec.Apply()
+               assert.NoError(t, err)
                assert.NotNil(t, newSpec)
                assert.Equal(t, 1, newSpec.ID())
                assert.Equal(t, 999, newSpec.LastAssignedFieldID())
diff --git a/table/updates.go b/table/updates.go
index af6f139b..f91bd487 100644
--- a/table/updates.go
+++ b/table/updates.go
@@ -553,12 +553,12 @@ func (u *removeSnapshotRefUpdate) Apply(builder 
*MetadataBuilder) error {
 
 type removeSpecUpdate struct {
        baseUpdate
-       SpecIds []int64 `json:"spec-ids"`
+       SpecIds []int `json:"spec-ids"`
 }
 
 // NewRemoveSpecUpdate creates a new Update that removes a list of partition 
specs
 // from the table metadata.
-func NewRemoveSpecUpdate(specIds []int64) *removeSpecUpdate {
+func NewRemoveSpecUpdate(specIds []int) *removeSpecUpdate {
        return &removeSpecUpdate{
                baseUpdate: baseUpdate{ActionName: UpdateRemoveSpec},
                SpecIds:    specIds,
@@ -566,7 +566,9 @@ func NewRemoveSpecUpdate(specIds []int64) *removeSpecUpdate 
{
 }
 
 func (u *removeSpecUpdate) Apply(builder *MetadataBuilder) error {
-       return fmt.Errorf("%w: %s", iceberg.ErrNotImplemented, UpdateRemoveSpec)
+       _, err := builder.RemovePartitionSpecs(u.SpecIds)
+
+       return err
 }
 
 type removeSchemasUpdate struct {
diff --git a/table/updates_test.go b/table/updates_test.go
index 0c5eeea1..c96a97c4 100644
--- a/table/updates_test.go
+++ b/table/updates_test.go
@@ -230,15 +230,3 @@ func TestRemoveSchemas(t *testing.T) {
                }
        })
 }
-
-func TestRemovePartitionSpecs(t *testing.T) {
-       var builder *MetadataBuilder
-       removeSpecs := removeSpecUpdate{
-               SpecIds: []int64{},
-       }
-       t.Run("remove specs should fail", func(t *testing.T) {
-               if err := removeSpecs.Apply(builder); !errors.Is(err, 
iceberg.ErrNotImplemented) {
-                       t.Fatalf("Expected unimplemented error, got %v", err)
-               }
-       })
-}

Reply via email to