This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 194a45f  feat(transaction): Add initial support for update spec in 
transaction API (#467)
194a45f is described below

commit 194a45f88d6c9de41a2cbfe5453ec839fe7257ee
Author: Leon Lin <[email protected]>
AuthorDate: Mon Jul 14 12:38:48 2025 -0700

    feat(transaction): Add initial support for update spec in transaction API 
(#467)
    
    ### Description
    * Add support for update spec in transaction API
    * Reference:
    
https://github.com/apache/iceberg-python/blob/main/pyiceberg/table/update/spec.py
    
    ### Usage
    ```
    specUpdates := txn.UpdateSpec()
    
    specUpdates, _ = specUpdates.AddField("ts", iceberg.YearTransform{}, 
"year_transform")
    specUpdates, _ = specUpdates.RenameField("id_identity", "new_id_identity")
    specUpdates, _ = specUpdates.RemoveFiled("field")
    
    specUpdates.Commit()
    txn.Commit()
    ```
    
    ### Testing
    * `update_spec_test.go`
    
    ---------
    
    Co-authored-by: Matt Topol <[email protected]>
---
 partitions.go             |   6 +-
 table/transaction.go      |   4 +
 table/update_spec.go      | 401 +++++++++++++++++++++++++++++++++++++
 table/update_spec_test.go | 491 ++++++++++++++++++++++++++++++++++++++++++++++
 transforms.go             |   6 +-
 5 files changed, 902 insertions(+), 6 deletions(-)

diff --git a/partitions.go b/partitions.go
index f2923d0..03cb383 100644
--- a/partitions.go
+++ b/partitions.go
@@ -28,7 +28,7 @@ import (
 )
 
 const (
-       partitionDataIDStart   = 1000
+       PartitionDataIDStart   = 1000
        InitialPartitionSpecID = 0
 )
 
@@ -199,7 +199,7 @@ func (ps PartitionSpec) String() string {
 
 func (ps *PartitionSpec) LastAssignedFieldID() int {
        if len(ps.fields) == 0 {
-               return partitionDataIDStart - 1
+               return PartitionDataIDStart - 1
        }
 
        id := ps.fields[0].FieldID
@@ -286,7 +286,7 @@ func AssignFreshPartitionSpecIDs(spec *PartitionSpec, old, 
fresh *Schema) (Parti
                newFields = append(newFields, PartitionField{
                        Name:      field.Name,
                        SourceID:  freshField.ID,
-                       FieldID:   partitionDataIDStart + pos,
+                       FieldID:   PartitionDataIDStart + pos,
                        Transform: field.Transform,
                })
        }
diff --git a/table/transaction.go b/table/transaction.go
index 6379089..bc43def 100644
--- a/table/transaction.go
+++ b/table/transaction.go
@@ -142,6 +142,10 @@ func (t *Transaction) SetProperties(props 
iceberg.Properties) error {
        return nil
 }
 
+func (t *Transaction) UpdateSpec(caseSensitive bool) *UpdateSpec {
+       return NewUpdateSpec(t, caseSensitive)
+}
+
 func (t *Transaction) AppendTable(ctx context.Context, tbl arrow.Table, 
batchSize int64, snapshotProps iceberg.Properties) error {
        rdr := array.NewTableReader(tbl, batchSize)
        defer rdr.Release()
diff --git a/table/update_spec.go b/table/update_spec.go
new file mode 100644
index 0000000..df777d6
--- /dev/null
+++ b/table/update_spec.go
@@ -0,0 +1,401 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "errors"
+       "fmt"
+       "slices"
+
+       "github.com/apache/iceberg-go"
+)
+
+// UpdateSpec implements a builder for evolving a table's partition 
specification.
+//
+// It accumulates a sequence of partition spec update operations (e.g., 
AddField, RemoveField, RenameField)
+// which are applied during BuildUpdates.
+//
+// Use the builder methods to chain operations, and call BuildUpdates to apply 
them and produce the
+// final set of partition fields and update requirements, or call Commit to 
apply the updates in the transaction.
+type UpdateSpec struct {
+       operations []updateSpecOp
+
+       txn                   *Transaction
+       nameToField           map[string]iceberg.PartitionField
+       nameToAddedField      map[string]iceberg.PartitionField
+       transformToField      map[transformKey]iceberg.PartitionField
+       transformToAddedField map[transformKey]iceberg.PartitionField
+       renames               map[string]string
+       addedTimeFields       map[int]iceberg.PartitionField
+       caseSensitive         bool
+       adds                  []iceberg.PartitionField
+       deletes               map[int]bool
+       lastAssignedFieldId   int
+}
+type updateSpecOp func() error
+
+type transformKey struct {
+       SourceId  int
+       Transform string
+}
+
+func NewUpdateSpec(t *Transaction, caseSensitive bool) *UpdateSpec {
+       transformToField := make(map[transformKey]iceberg.PartitionField)
+       nameToField := make(map[string]iceberg.PartitionField)
+       partitionSpec := t.tbl.Metadata().PartitionSpec()
+       for partitionField := range partitionSpec.Fields() {
+               transformToField[transformKey{
+                       SourceId:  partitionField.SourceID,
+                       Transform: partitionField.Transform.String(),
+               }] = partitionField
+               nameToField[partitionField.Name] = partitionField
+       }
+       lastAssignedFieldId := t.tbl.Metadata().LastPartitionSpecID()
+       if lastAssignedFieldId == nil {
+               v := iceberg.PartitionDataIDStart - 1
+               lastAssignedFieldId = &v
+       }
+
+       return &UpdateSpec{
+               txn:                   t,
+               nameToField:           nameToField,
+               nameToAddedField:      make(map[string]iceberg.PartitionField),
+               transformToField:      transformToField,
+               transformToAddedField: 
make(map[transformKey]iceberg.PartitionField),
+               renames:               make(map[string]string),
+               addedTimeFields:       make(map[int]iceberg.PartitionField),
+               caseSensitive:         caseSensitive,
+               adds:                  make([]iceberg.PartitionField, 0),
+               deletes:               make(map[int]bool),
+               lastAssignedFieldId:   *lastAssignedFieldId,
+       }
+}
+
+func (us *UpdateSpec) AddField(sourceColName string, transform 
iceberg.Transform, partitionFieldName string) *UpdateSpec {
+       us.operations = append(us.operations, us.addField(sourceColName, 
transform, partitionFieldName))
+
+       return us
+}
+
+func (us *UpdateSpec) AddIdentity(sourceColName string) *UpdateSpec {
+       return us.AddField(sourceColName, iceberg.IdentityTransform{}, "")
+}
+
+func (us *UpdateSpec) RemoveField(name string) *UpdateSpec {
+       us.operations = append(us.operations, us.removeField(name))
+
+       return us
+}
+
+func (us *UpdateSpec) RenameField(name string, newName string) *UpdateSpec {
+       us.operations = append(us.operations, us.renameField(name, newName))
+
+       return us
+}
+
+func (us *UpdateSpec) BuildUpdates() ([]Update, []Requirement, error) {
+       for _, op := range us.operations {
+               if err := op(); err != nil {
+                       return nil, nil, err
+               }
+       }
+
+       newSpec := us.Apply()
+       updates := make([]Update, 0)
+       requirements := make([]Requirement, 0)
+
+       if us.txn.tbl.Metadata().DefaultPartitionSpec() != newSpec.ID() {
+               if us.isNewPartitionSpec(newSpec.ID()) {
+                       updates = append(updates, 
NewAddPartitionSpecUpdate(&newSpec, false))
+                       updates = append(updates, NewSetDefaultSpecUpdate(-1))
+               } else {
+                       updates = append(updates, 
NewSetDefaultSpecUpdate(newSpec.ID()))
+               }
+               requiredLastAssignedPartitionId := 
us.txn.tbl.Metadata().LastPartitionSpecID()
+               requirements = append(requirements, 
AssertLastAssignedPartitionID(*requiredLastAssignedPartitionId))
+       }
+
+       return updates, requirements, nil
+}
+
+func (us *UpdateSpec) Apply() iceberg.PartitionSpec {
+       partitionFields := make([]iceberg.PartitionField, 0)
+       partitionNames := make(map[string]bool)
+       spec := us.txn.tbl.Metadata().PartitionSpec()
+       for field := range spec.Fields() {
+               var newField iceberg.PartitionField
+               var err error
+               if _, deleted := us.deletes[field.FieldID]; !deleted {
+                       if rename, renamed := us.renames[field.Name]; renamed {
+                               newField, err = 
us.addNewField(us.txn.tbl.Schema(), field.SourceID, field.FieldID, rename, 
field.Transform, partitionNames)
+                       } else {
+                               newField, err = 
us.addNewField(us.txn.tbl.Schema(), field.SourceID, field.FieldID, field.Name, 
field.Transform, partitionNames)
+                       }
+                       if err != nil {
+                               return iceberg.PartitionSpec{}
+                       }
+                       partitionFields = append(partitionFields, newField)
+               } else if us.txn.tbl.Metadata().Version() == 1 {
+                       if rename, renamed := us.renames[field.Name]; renamed {
+                               newField, err = 
us.addNewField(us.txn.tbl.Schema(), field.SourceID, field.FieldID, rename, 
iceberg.VoidTransform{}, partitionNames)
+                       } else {
+                               newField, err = 
us.addNewField(us.txn.tbl.Schema(), field.SourceID, field.FieldID, field.Name, 
iceberg.VoidTransform{}, partitionNames)
+                       }
+                       if err != nil {
+                               return iceberg.PartitionSpec{}
+                       }
+                       partitionFields = append(partitionFields, newField)
+               }
+       }
+
+       partitionFields = append(partitionFields, us.adds...)
+
+       newSpec := iceberg.NewPartitionSpec(partitionFields...)
+       newSpecId := iceberg.InitialPartitionSpecID
+       for _, spec = range us.txn.tbl.Metadata().PartitionSpecs() {
+               if newSpec.CompatibleWith(&spec) {
+                       newSpecId = spec.ID()
+
+                       break
+               } else if newSpecId <= spec.ID() {
+                       newSpecId = spec.ID() + 1
+               }
+       }
+
+       return iceberg.NewPartitionSpecID(newSpecId, partitionFields...)
+}
+
+func (us *UpdateSpec) Commit() error {
+       updates, requirements, err := us.BuildUpdates()
+       if err != nil {
+               return err
+       }
+
+       if len(updates) == 0 {
+               return nil
+       }
+
+       return us.txn.apply(updates, requirements)
+}
+
+func (us *UpdateSpec) addField(sourceColName string, transform 
iceberg.Transform, partitionFieldName string) updateSpecOp {
+       return func() error {
+               // Finds the column in the schema and binds it with case 
sensitivity.
+               ref := iceberg.Reference(sourceColName)
+               boundTerm, err := ref.Bind(us.txn.tbl.Schema(), 
us.caseSensitive)
+               if err != nil {
+                       return err
+               }
+
+               // Validate the transform
+               outputType := boundTerm.Type()
+               if !transform.CanTransform(outputType) {
+                       return fmt.Errorf("%s cannot transform %s values from 
%s", transform.String(), outputType.String(), boundTerm.Ref().Field().Name)
+               }
+
+               // Check for duplicate partition on same source
+               key := transformKey{
+                       SourceId:  boundTerm.Ref().Field().ID,
+                       Transform: transform.String(),
+               }
+               existingPartitionField, exists := us.transformToField[key]
+               if exists && us.isDuplicatePartition(transform, 
existingPartitionField) {
+                       return fmt.Errorf("duplicate partition field for %s=%v, 
%v already exists", ref.String(), ref, existingPartitionField)
+               }
+
+               // Check if this transform was already added
+               added, exists := us.transformToAddedField[key]
+               if exists {
+                       return fmt.Errorf("already added partition: %s ", 
added.Name)
+               }
+
+               // Create the new partition field and Check for name collisions
+               // with existing fields
+               newField, err := us.partitionField(key, partitionFieldName)
+               if err != nil {
+                       return err
+               }
+               if _, exists = us.nameToAddedField[newField.Name]; exists {
+                       return fmt.Errorf("already added partition field with 
name: %s", newField.Name)
+               }
+
+               // Handle special case for time transforms
+               if _, isTimeTransform := 
newField.Transform.(iceberg.TimeTransform); isTimeTransform {
+                       if existingTimeField, exists := 
us.addedTimeFields[newField.SourceID]; exists {
+                               return fmt.Errorf("cannot add time partition 
field: %s conflicts with %s", newField.Name, existingTimeField.Name)
+                       }
+                       us.addedTimeFields[newField.SourceID] = newField
+               }
+               us.transformToAddedField[key] = newField
+
+               // If name matches an existing field, rename it if it's VOID 
transform
+               existingPartitionField, exists = us.nameToField[newField.Name]
+               if _, inDelete := us.deletes[existingPartitionField.FieldID]; 
exists && !inDelete {
+                       if _, isVoidTransform := 
existingPartitionField.Transform.(iceberg.VoidTransform); isVoidTransform {
+                               if err := us.renameField(
+                                       existingPartitionField.Name,
+                                       fmt.Sprintf("%s_%d", 
existingPartitionField.Name, existingPartitionField.FieldID),
+                               )(); err != nil {
+                                       return err
+                               }
+                       } else {
+                               return fmt.Errorf("cannot add duplicate 
partition field name: %s", existingPartitionField.Name)
+                       }
+               }
+
+               // Register the new field
+               us.nameToAddedField[newField.Name] = newField
+               us.adds = append(us.adds, newField)
+
+               return nil
+       }
+}
+
+func (us *UpdateSpec) removeField(name string) updateSpecOp {
+       return func() error {
+               if _, added := us.nameToAddedField[name]; added {
+                       return fmt.Errorf("cannot remove newly added field %s", 
name)
+               }
+               if _, renamed := us.renames[name]; renamed {
+                       return fmt.Errorf("cannot rename and delete field %s", 
name)
+               }
+               field, exists := us.nameToField[name]
+               if !exists {
+                       return fmt.Errorf("cannot find partition field %s", 
name)
+               }
+               us.deletes[field.FieldID] = true
+
+               return nil
+       }
+}
+
+func (us *UpdateSpec) renameField(name string, newName string) updateSpecOp {
+       return func() error {
+               existingField, exists := us.nameToField[newName]
+               _, isVoidTransform := 
existingField.Transform.(iceberg.VoidTransform)
+               if exists && isVoidTransform {
+                       return us.renameField(
+                               name,
+                               fmt.Sprintf("%s_%d", name, 
existingField.FieldID),
+                       )()
+               }
+               if _, added := us.nameToAddedField[name]; added {
+                       return errors.New("cannot rename recently added 
partitions")
+               }
+
+               field, exists := us.nameToField[name]
+               if !exists {
+                       return fmt.Errorf("cannot find partition field: %s", 
name)
+               }
+               if _, deleted := us.deletes[field.FieldID]; deleted {
+                       return fmt.Errorf("cannot delete and rename partition 
field: %s", name)
+               }
+               us.renames[name] = newName
+
+               return nil
+       }
+}
+
+func (us *UpdateSpec) partitionField(key transformKey, name string) 
(iceberg.PartitionField, error) {
+       if us.txn.tbl.Metadata().Version() == 2 {
+               sourceId, transform := key.SourceId, key.Transform
+               historicalFields := make([]iceberg.PartitionField, 0)
+               for _, spec := range us.txn.tbl.Metadata().PartitionSpecs() {
+                       historicalFields = slices.AppendSeq(historicalFields, 
spec.Fields())
+               }
+               for _, field := range historicalFields {
+                       if field.SourceID == sourceId && 
field.Transform.String() == transform {
+                               if len(name) > 0 && field.Name == name {
+                                       return iceberg.PartitionField{
+                                               SourceID:  sourceId,
+                                               FieldID:   field.FieldID,
+                                               Name:      name,
+                                               Transform: field.Transform,
+                                       }, nil
+                               }
+                       }
+               }
+       }
+       newFieldId := us.newFieldId()
+       transform, _ := iceberg.ParseTransform(key.Transform)
+       if name == "" {
+               tmpField := iceberg.PartitionField{
+                       SourceID:  key.SourceId,
+                       FieldID:   newFieldId,
+                       Name:      "",
+                       Transform: transform,
+               }
+               var err error
+               name, err = 
iceberg.GeneratePartitionFieldName(us.txn.tbl.Schema(), tmpField)
+               if err != nil {
+                       return iceberg.PartitionField{}, err
+               }
+       }
+
+       return iceberg.PartitionField{
+               SourceID:  key.SourceId,
+               FieldID:   newFieldId,
+               Name:      name,
+               Transform: transform,
+       }, nil
+}
+
+func (us *UpdateSpec) newFieldId() int {
+       us.lastAssignedFieldId += 1
+
+       return us.lastAssignedFieldId
+}
+
+func (us *UpdateSpec) isDuplicatePartition(transform iceberg.Transform, 
partitionField iceberg.PartitionField) bool {
+       _, deleted := us.deletes[partitionField.FieldID]
+
+       return !deleted && transform.Equals(partitionField.Transform)
+}
+
+func (us *UpdateSpec) checkAndAddPartitionName(schema *iceberg.Schema, name 
string, sourceId int, partitionNames map[string]bool) error {
+       field, found := schema.FindFieldByName(name)
+       if found && field.ID != sourceId {
+               return fmt.Errorf("cannot create partition from name that 
exists in schema %s", name)
+       }
+       if _, exists := partitionNames[name]; exists {
+               return fmt.Errorf("partition name has to be unique: %s", name)
+       }
+       partitionNames[name] = true
+
+       return nil
+}
+
+func (us *UpdateSpec) addNewField(schema *iceberg.Schema, sourceId int, 
fieldId int, name string, transform iceberg.Transform, partitionNames 
map[string]bool) (iceberg.PartitionField, error) {
+       err := us.checkAndAddPartitionName(schema, name, sourceId, 
partitionNames)
+       if err != nil {
+               return iceberg.PartitionField{}, err
+       }
+
+       return iceberg.PartitionField{
+               SourceID:  sourceId,
+               FieldID:   fieldId,
+               Name:      name,
+               Transform: transform,
+       }, nil
+}
+
+func (us *UpdateSpec) isNewPartitionSpec(newSpecId int) bool {
+       return !slices.ContainsFunc(us.txn.tbl.Metadata().PartitionSpecs(), 
func(s iceberg.PartitionSpec) bool {
+               return s.ID() == newSpecId
+       })
+}
diff --git a/table/update_spec_test.go b/table/update_spec_test.go
new file mode 100644
index 0000000..0fc102a
--- /dev/null
+++ b/table/update_spec_test.go
@@ -0,0 +1,491 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table_test
+
+import (
+       "testing"
+
+       "github.com/apache/iceberg-go"
+       "github.com/apache/iceberg-go/table"
+       "github.com/stretchr/testify/assert"
+)
+
+var testSchema = iceberg.NewSchema(1,
+       iceberg.NestedField{ID: 1, Name: "id", Required: true, Type: 
iceberg.PrimitiveTypes.Int64},
+       iceberg.NestedField{ID: 2, Name: "name", Required: true, Type: 
iceberg.PrimitiveTypes.String},
+       iceberg.NestedField{ID: 3, Name: "ts", Required: false, Type: 
iceberg.PrimitiveTypes.Timestamp},
+       iceberg.NestedField{ID: 4, Name: "address", Required: false, Type: 
&iceberg.StructType{
+               FieldList: []iceberg.NestedField{
+                       {ID: 5, Name: "street", Type: 
iceberg.PrimitiveTypes.String, Required: true},
+                       {ID: 6, Name: "city", Type: 
iceberg.PrimitiveTypes.String, Required: true},
+                       {ID: 7, Name: "zip_code", Type: 
iceberg.PrimitiveTypes.Int64, Required: false},
+               },
+       }},
+)
+
+var partitionSpec = iceberg.NewPartitionSpec(
+       iceberg.PartitionField{
+               SourceID:  1,
+               FieldID:   iceberg.PartitionDataIDStart,
+               Name:      "id_identity",
+               Transform: iceberg.IdentityTransform{},
+       },
+       iceberg.PartitionField{
+               SourceID:  5,
+               FieldID:   iceberg.PartitionDataIDStart + 1,
+               Name:      "street_void",
+               Transform: iceberg.VoidTransform{},
+       })
+
+var testMetadataNonPartitioned, _ = table.NewMetadata(testSchema, 
iceberg.UnpartitionedSpec, table.UnsortedSortOrder, "", nil)
+
+var testMetadataPartitioned, _ = table.NewMetadata(testSchema, &partitionSpec, 
table.UnsortedSortOrder, "", nil)
+
+var testNonPartitionedTable = table.New([]string{"non_partitioned"}, 
testMetadataNonPartitioned, "", nil, nil)
+
+var testPartitionedTable = table.New([]string{"partitioned"}, 
testMetadataPartitioned, "", nil, nil)
+
+func TestUpdateSpecAddField(t *testing.T) {
+       var txn *table.Transaction
+
+       t.Run("add partition fields", func(t *testing.T) {
+               txn = testPartitionedTable.NewTransaction()
+               specUpdate := table.NewUpdateSpec(txn, false)
+
+               updates, reqs, err := specUpdate.
+                       AddField("ts", iceberg.YearTransform{}, 
"year_transform").
+                       AddField("address.Zip_cOdE", 
iceberg.BucketTransform{NumBuckets: 5}, "zipcode_bucket").
+                       BuildUpdates()
+               assert.NoError(t, err)
+               assert.NotNil(t, updates)
+               assert.NotNil(t, reqs)
+
+               newSpec := specUpdate.Apply()
+               assert.NotNil(t, newSpec)
+               assert.Equal(t, 1, newSpec.ID())
+               assert.Equal(t, 1003, newSpec.LastAssignedFieldID())
+               assert.Equal(t, 4, newSpec.NumFields())
+               assert.Equal(t, "id_identity", 
newSpec.FieldsBySourceID(1)[0].Name)
+               assert.Equal(t, "street_void", 
newSpec.FieldsBySourceID(5)[0].Name)
+
+               addedField := newSpec.FieldsBySourceID(3)[0]
+               assert.Equal(t, 3, addedField.SourceID)
+               assert.Equal(t, 1002, addedField.FieldID)
+               assert.Equal(t, "year_transform", addedField.Name)
+               assert.Equal(t, iceberg.YearTransform{}, addedField.Transform)
+
+               addedField = newSpec.FieldsBySourceID(7)[0]
+               assert.Equal(t, 7, addedField.SourceID)
+               assert.Equal(t, 1003, addedField.FieldID)
+               assert.Equal(t, "zipcode_bucket", addedField.Name)
+               assert.Equal(t, iceberg.BucketTransform{NumBuckets: 5}, 
addedField.Transform)
+       })
+
+       t.Run("add partition field case sensitive", func(t *testing.T) {
+               txn = testNonPartitionedTable.NewTransaction()
+               updates := table.NewUpdateSpec(txn, true)
+               _, _, err := updates.
+                       AddField("NaMe", iceberg.VoidTransform{}, "name_void").
+                       BuildUpdates()
+               assert.Error(t, err)
+               assert.ErrorContains(t, err, "invalid schema: could not bind 
reference")
+       })
+
+       t.Run("add invalid partition transform field", func(t *testing.T) {
+               txn = testNonPartitionedTable.NewTransaction()
+               specUpdate := table.NewUpdateSpec(txn, true)
+               updates, reqs, err := specUpdate.
+                       AddField("name", iceberg.YearTransform{}, "name_year").
+                       BuildUpdates()
+               assert.Error(t, err)
+               assert.ErrorContains(t, err, "year cannot transform string 
values from name")
+               assert.Nil(t, updates)
+               assert.Nil(t, reqs)
+       })
+
+       t.Run("add duplicate partition field", func(t *testing.T) {
+               txn = testPartitionedTable.NewTransaction()
+               specUpdate := table.NewUpdateSpec(txn, true)
+               updates, reqs, err := specUpdate.
+                       AddField("id", iceberg.IdentityTransform{}, 
"id_transform").
+                       BuildUpdates()
+               assert.Error(t, err)
+               assert.ErrorContains(t, err, "duplicate partition field")
+               assert.Nil(t, updates)
+               assert.Nil(t, reqs)
+       })
+
+       t.Run("add already added partition field", func(t *testing.T) {
+               txn = testPartitionedTable.NewTransaction()
+               updateSpec := table.NewUpdateSpec(txn, true)
+
+               updates, reqs, err := updateSpec.
+                       AddField("ts", iceberg.YearTransform{}, 
"year_transform_1").
+                       AddField("ts", iceberg.YearTransform{}, 
"year_transform_2").
+                       BuildUpdates()
+               assert.Error(t, err)
+               assert.ErrorContains(t, err, "already added partition")
+               assert.Nil(t, updates)
+               assert.Nil(t, reqs)
+       })
+
+       t.Run("add duplicate partition field name", func(t *testing.T) {
+               txn = testPartitionedTable.NewTransaction()
+               specUpdate := table.NewUpdateSpec(txn, true)
+
+               updates, reqs, err := specUpdate.
+                       AddField("ts", iceberg.YearTransform{}, 
"year_transform_1").
+                       AddField("ts", iceberg.MonthTransform{}, 
"year_transform_1").
+                       BuildUpdates()
+
+               assert.Error(t, err)
+               assert.ErrorContains(t, err, "already added partition field 
with name")
+               assert.Nil(t, updates)
+               assert.Nil(t, reqs)
+       })
+
+       t.Run("add conflicted time transform partition field", func(t 
*testing.T) {
+               txn = testNonPartitionedTable.NewTransaction()
+               updateSpec := table.NewUpdateSpec(txn, true)
+
+               updates, reqs, err := updateSpec.
+                       AddField("ts", iceberg.YearTransform{}, "ts_year").
+                       AddField("ts", iceberg.MonthTransform{}, "ts_month").
+                       BuildUpdates()
+
+               assert.Error(t, err)
+               assert.ErrorContains(t, err, "cannot add time partition field")
+               assert.Nil(t, updates)
+               assert.Nil(t, reqs)
+       })
+
+       t.Run("add duplicate partition field", func(t *testing.T) {
+               txn = testPartitionedTable.NewTransaction()
+               updateSpec := table.NewUpdateSpec(txn, true)
+
+               updates, reqs, err := updateSpec.
+                       AddField("ts", iceberg.YearTransform{}, "id_identity").
+                       BuildUpdates()
+               assert.Error(t, err)
+               assert.ErrorContains(t, err, "cannot add duplicate partition 
field name")
+               assert.Nil(t, updates)
+               assert.Nil(t, reqs)
+       })
+
+       t.Run("add duplicate partition field name with void transform", func(t 
*testing.T) {
+               txn = testPartitionedTable.NewTransaction()
+               specUpdate := table.NewUpdateSpec(txn, true)
+
+               updates, reqs, err := specUpdate.
+                       AddField("ts", iceberg.VoidTransform{}, "street_void").
+                       BuildUpdates()
+               assert.NoError(t, err)
+               assert.NotNil(t, updates)
+               assert.NotNil(t, reqs)
+
+               newSpec := specUpdate.Apply()
+               assert.NotNil(t, newSpec)
+               assert.Equal(t, "street_void_1001", 
newSpec.FieldsBySourceID(5)[0].Name)
+       })
+}
+
+func TestUpdateSpecAddIdentityField(t *testing.T) {
+       var txn *table.Transaction
+
+       t.Run("add identity partition fields", func(t *testing.T) {
+               txn = testPartitionedTable.NewTransaction()
+               specUpdate := table.NewUpdateSpec(txn, false)
+               updates, reqs, err := specUpdate.
+                       AddIdentity("ts").
+                       AddIdentity("name").
+                       BuildUpdates()
+               assert.NoError(t, err)
+               assert.NotNil(t, updates)
+               assert.NotNil(t, reqs)
+
+               newSpec := specUpdate.Apply()
+               assert.NotNil(t, newSpec)
+               assert.Equal(t, 1, newSpec.ID())
+               assert.Equal(t, 1003, newSpec.LastAssignedFieldID())
+               assert.Equal(t, 4, newSpec.NumFields())
+               assert.Equal(t, "id_identity", 
newSpec.FieldsBySourceID(1)[0].Name)
+               assert.Equal(t, "street_void", 
newSpec.FieldsBySourceID(5)[0].Name)
+
+               addedField := newSpec.FieldsBySourceID(3)[0]
+               assert.Equal(t, 3, addedField.SourceID)
+               assert.Equal(t, 1002, addedField.FieldID)
+               assert.Equal(t, "ts", addedField.Name)
+               assert.Equal(t, iceberg.IdentityTransform{}, 
addedField.Transform)
+
+               addedField = newSpec.FieldsBySourceID(2)[0]
+               assert.Equal(t, 2, addedField.SourceID)
+               assert.Equal(t, 1003, addedField.FieldID)
+               assert.Equal(t, "name", addedField.Name)
+               assert.Equal(t, iceberg.IdentityTransform{}, 
addedField.Transform)
+       })
+}
+
+func TestUpdateSpecRenameField(t *testing.T) {
+       var txn *table.Transaction
+
+       t.Run("rename partition fields", func(t *testing.T) {
+               txn = testPartitionedTable.NewTransaction()
+               updateSpec := table.NewUpdateSpec(txn, false)
+               updates, reqs, err := updateSpec.
+                       RenameField("id_identity", "new_id_identity").
+                       RenameField("street_void", "new_street_void").
+                       BuildUpdates()
+               assert.NoError(t, err)
+               assert.NotNil(t, updates)
+               assert.NotNil(t, reqs)
+
+               newSpec := updateSpec.Apply()
+               assert.NotNil(t, newSpec)
+               assert.Equal(t, 1, newSpec.ID())
+               assert.Equal(t, "new_id_identity", 
newSpec.FieldsBySourceID(1)[0].Name)
+               assert.Equal(t, "new_street_void", 
newSpec.FieldsBySourceID(5)[0].Name)
+       })
+
+       t.Run("rename recently added partition", func(t *testing.T) {
+               txn = testPartitionedTable.NewTransaction()
+               updateSpec := table.NewUpdateSpec(txn, false)
+
+               updates, reqs, err := updateSpec.
+                       AddField("ts", iceberg.YearTransform{}, 
"year_transform").
+                       RenameField("year_transform", "new_year_transform").
+                       BuildUpdates()
+               assert.Error(t, err)
+               assert.ErrorContains(t, err, "cannot rename recently added 
partitions")
+               assert.Nil(t, updates)
+               assert.Nil(t, reqs)
+       })
+
+       t.Run("rename a partition field that doesn't exist", func(t *testing.T) 
{
+               txn = testPartitionedTable.NewTransaction()
+               updateSpec := table.NewUpdateSpec(txn, false)
+
+               updates, reqs, err := updateSpec.
+                       RenameField("non_exist_field", "new_non_exist_field").
+                       BuildUpdates()
+               assert.Error(t, err)
+               assert.ErrorContains(t, err, "cannot find partition field")
+               assert.Nil(t, updates)
+               assert.Nil(t, reqs)
+       })
+
+       t.Run("rename a partition field deleted in the same transaction", 
func(t *testing.T) {
+               txn = testPartitionedTable.NewTransaction()
+               updateSpec := table.NewUpdateSpec(txn, false)
+
+               updates, reqs, err := updateSpec.
+                       RemoveField("id_identity").
+                       RenameField("id_identity", "new_id_identity").
+                       BuildUpdates()
+               assert.Error(t, err)
+               assert.ErrorContains(t, err, "cannot delete and rename 
partition field")
+               assert.Nil(t, updates)
+               assert.Nil(t, reqs)
+       })
+}
+
+func TestUpdateSpecRemoveField(t *testing.T) {
+       var txn *table.Transaction
+
+       t.Run("remove existing partition fields", func(t *testing.T) {
+               txn = testPartitionedTable.NewTransaction()
+               updateSpec := table.NewUpdateSpec(txn, false)
+
+               updates, reqs, err := updateSpec.
+                       RemoveField("street_void").
+                       RemoveField("id_identity").
+                       BuildUpdates()
+               assert.NoError(t, err)
+               assert.NotNil(t, updates)
+               assert.NotNil(t, reqs)
+
+               newSpec := updateSpec.Apply()
+               assert.NotNil(t, newSpec)
+               assert.Equal(t, 1, newSpec.ID())
+               assert.Equal(t, 999, newSpec.LastAssignedFieldID())
+               assert.Equal(t, 0, newSpec.NumFields())
+               assert.Equal(t, true, newSpec.IsUnpartitioned())
+       })
+
+       t.Run("remove newly added partition field", func(t *testing.T) {
+               txn = testPartitionedTable.NewTransaction()
+               updateSpec := table.NewUpdateSpec(txn, false)
+
+               updates, reqs, err := updateSpec.
+                       AddField("ts", iceberg.YearTransform{}, 
"year_transform").
+                       RemoveField("year_transform").
+                       BuildUpdates()
+               assert.Error(t, err)
+               assert.ErrorContains(t, err, "cannot remove newly added field")
+               assert.Nil(t, updates)
+               assert.Nil(t, reqs)
+       })
+
+       t.Run("remove renamed partition field", func(t *testing.T) {
+               txn = testPartitionedTable.NewTransaction()
+               updateSpec := table.NewUpdateSpec(txn, false)
+
+               updates, reqs, err := updateSpec.
+                       RenameField("id_identity", "new_id_identity").
+                       RemoveField("id_identity").
+                       BuildUpdates()
+               assert.Error(t, err)
+               assert.ErrorContains(t, err, "cannot rename and delete field")
+               assert.Nil(t, updates)
+               assert.Nil(t, reqs)
+       })
+
+       t.Run("remove partition field that doesn't exist", func(t *testing.T) {
+               txn = testPartitionedTable.NewTransaction()
+               updateSpec := table.NewUpdateSpec(txn, false)
+
+               updates, reqs, err := updateSpec.
+                       RemoveField("non_exist_field").
+                       BuildUpdates()
+               assert.Error(t, err)
+               assert.ErrorContains(t, err, "cannot find partition field")
+               assert.Nil(t, updates)
+               assert.Nil(t, reqs)
+       })
+}
+
+func TestUpdateSpecBuildChanges(t *testing.T) {
+       var txn *table.Transaction
+
+       t.Run("build changes on added partition fields", func(t *testing.T) {
+               txn = testNonPartitionedTable.NewTransaction()
+               specUpdate := table.NewUpdateSpec(txn, false)
+
+               updates, reqs, err := specUpdate.
+                       AddField("ts", iceberg.YearTransform{}, 
"year_transform").
+                       AddField("address.zip_code", 
iceberg.BucketTransform{NumBuckets: 5}, "zipcode_bucket").
+                       BuildUpdates()
+               assert.NoError(t, err)
+               assert.NotNil(t, updates)
+               assert.NotNil(t, reqs)
+
+               assert.Equal(t, 2, len(updates))
+               assert.Equal(t, 1, len(reqs))
+
+               assert.Equal(t, table.UpdateAddSpec, updates[0].Action())
+               assert.Equal(t, table.UpdateSetDefaultSpec, updates[1].Action())
+               assert.Equal(t, "assert-last-assigned-partition-id", 
reqs[0].GetType())
+       })
+
+       t.Run("build changes on removed partition field", func(t *testing.T) {
+               txn = testPartitionedTable.NewTransaction()
+               specUpdate := table.NewUpdateSpec(txn, false)
+
+               updates, reqs, err := specUpdate.
+                       RemoveField("street_void").
+                       BuildUpdates()
+               assert.NoError(t, err)
+               assert.NotNil(t, updates)
+               assert.NotNil(t, reqs)
+
+               assert.Equal(t, 2, len(updates))
+               assert.Equal(t, 1, len(reqs))
+
+               assert.Equal(t, table.UpdateAddSpec, updates[0].Action())
+               assert.Equal(t, table.UpdateSetDefaultSpec, updates[1].Action())
+               assert.Equal(t, "assert-last-assigned-partition-id", 
reqs[0].GetType())
+       })
+
+       t.Run("build changes on renamed partition field", func(t *testing.T) {
+               txn = testPartitionedTable.NewTransaction()
+               specUpdate := table.NewUpdateSpec(txn, false)
+
+               updates, reqs, err := specUpdate.
+                       RenameField("street_void", "new_street_void").
+                       BuildUpdates()
+
+               assert.NoError(t, err)
+               assert.Equal(t, 2, len(updates))
+               assert.Equal(t, 1, len(reqs))
+
+               assert.Equal(t, table.UpdateAddSpec, updates[0].Action())
+               assert.Equal(t, table.UpdateSetDefaultSpec, updates[1].Action())
+               assert.Equal(t, "assert-last-assigned-partition-id", 
reqs[0].GetType())
+       })
+}
+
+func TestUpdateSpecCommit(t *testing.T) {
+       var txn *table.Transaction
+
+       t.Run("test commit apply changes on transaction", func(t *testing.T) {
+               txn = testPartitionedTable.NewTransaction()
+
+               specUpdate := table.NewUpdateSpec(txn, false)
+
+               err := specUpdate.
+                       AddField("address.city", 
iceberg.TruncateTransform{Width: 3}, "").
+                       AddIdentity("address.zip_code").
+                       RenameField("street_void", "new_street_void").
+                       RemoveField("id_identity").
+                       Commit()
+               assert.NoError(t, err)
+
+               stagedTbl, err := txn.StagedTable()
+               assert.NoError(t, err)
+
+               currSpec := stagedTbl.Spec()
+               assert.NotNil(t, currSpec)
+               assert.Equal(t, 1, currSpec.ID())
+               assert.Equal(t, 1003, currSpec.LastAssignedFieldID())
+               assert.Equal(t, 3, currSpec.NumFields())
+               assert.Equal(t, "new_street_void", 
currSpec.FieldsBySourceID(5)[0].Name)
+               assert.Equal(t, []iceberg.PartitionField(nil), 
currSpec.FieldsBySourceID(1))
+
+               addedField := currSpec.FieldsBySourceID(6)[0]
+               assert.Equal(t, 6, addedField.SourceID)
+               assert.Equal(t, 1002, addedField.FieldID)
+               assert.Equal(t, "address.city_trunc_3", addedField.Name)
+               assert.Equal(t, iceberg.TruncateTransform{Width: 3}, 
addedField.Transform)
+
+               addedIdentity := currSpec.FieldsBySourceID(7)[0]
+               assert.Equal(t, 7, addedIdentity.SourceID)
+               assert.Equal(t, 1003, addedIdentity.FieldID)
+               assert.Equal(t, "address.zip_code", addedIdentity.Name)
+               assert.Equal(t, iceberg.IdentityTransform{}, 
addedIdentity.Transform)
+       })
+
+       t.Run("test commit with build errors", func(t *testing.T) {
+               txn = testPartitionedTable.NewTransaction()
+
+               specUpdate := table.NewUpdateSpec(txn, false)
+
+               err := specUpdate.
+                       AddField("id", iceberg.IdentityTransform{}, 
"id_transform").
+                       Commit()
+               assert.Error(t, err)
+               assert.ErrorContains(t, err, "duplicate partition field")
+       })
+
+       t.Run("test commit with empty updates", func(t *testing.T) {
+               txn = testPartitionedTable.NewTransaction()
+
+               specUpdate := table.NewUpdateSpec(txn, false)
+
+               err := specUpdate.Commit()
+               assert.Nil(t, err)
+       })
+}
diff --git a/transforms.go b/transforms.go
index f59b3f8..8c3e0aa 100644
--- a/transforms.go
+++ b/transforms.go
@@ -550,12 +550,12 @@ func (t TruncateTransform) Project(name string, pred 
BoundPredicate) (UnboundPre
 
 var epochTM = time.Unix(0, 0).UTC()
 
-type timeTransform interface {
+type TimeTransform interface {
        Transform
        Transformer(Type) (func(any) Optional[int32], error)
 }
 
-func canTransformTime(t timeTransform, sourceType Type) bool {
+func canTransformTime(t TimeTransform, sourceType Type) bool {
        switch sourceType.(type) {
        case DateType, TimestampType, TimestampTzType:
                return true
@@ -564,7 +564,7 @@ func canTransformTime(t timeTransform, sourceType Type) 
bool {
        }
 }
 
-func projectTimeTransform(t timeTransform, name string, pred BoundPredicate) 
(UnboundPredicate, error) {
+func projectTimeTransform(t TimeTransform, name string, pred BoundPredicate) 
(UnboundPredicate, error) {
        if _, ok := pred.Term().(*BoundTransform); ok {
                return projectTransformPredicate(t, name, pred)
        }


Reply via email to