This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 194a45f feat(transaction): Add initial support for update spec in
transaction API (#467)
194a45f is described below
commit 194a45f88d6c9de41a2cbfe5453ec839fe7257ee
Author: Leon Lin <[email protected]>
AuthorDate: Mon Jul 14 12:38:48 2025 -0700
feat(transaction): Add initial support for update spec in transaction API
(#467)
### Description
* Add support for update spec in transaction API
* Reference:
https://github.com/apache/iceberg-python/blob/main/pyiceberg/table/update/spec.py
### Usage
```
specUpdates := txn.UpdateSpec()
specUpdates, _ = specUpdates.AddField("ts", iceberg.YearTransform{},
"year_transform")
specUpdates, _ = specUpdates.RenameField("id_identity", "new_id_identity")
specUpdates, _ = specUpdates.RemoveFiled("field")
specUpdates.Commit()
txn.Commit()
```
### Testing
* `update_spec_test.go`
---------
Co-authored-by: Matt Topol <[email protected]>
---
partitions.go | 6 +-
table/transaction.go | 4 +
table/update_spec.go | 401 +++++++++++++++++++++++++++++++++++++
table/update_spec_test.go | 491 ++++++++++++++++++++++++++++++++++++++++++++++
transforms.go | 6 +-
5 files changed, 902 insertions(+), 6 deletions(-)
diff --git a/partitions.go b/partitions.go
index f2923d0..03cb383 100644
--- a/partitions.go
+++ b/partitions.go
@@ -28,7 +28,7 @@ import (
)
const (
- partitionDataIDStart = 1000
+ PartitionDataIDStart = 1000
InitialPartitionSpecID = 0
)
@@ -199,7 +199,7 @@ func (ps PartitionSpec) String() string {
func (ps *PartitionSpec) LastAssignedFieldID() int {
if len(ps.fields) == 0 {
- return partitionDataIDStart - 1
+ return PartitionDataIDStart - 1
}
id := ps.fields[0].FieldID
@@ -286,7 +286,7 @@ func AssignFreshPartitionSpecIDs(spec *PartitionSpec, old,
fresh *Schema) (Parti
newFields = append(newFields, PartitionField{
Name: field.Name,
SourceID: freshField.ID,
- FieldID: partitionDataIDStart + pos,
+ FieldID: PartitionDataIDStart + pos,
Transform: field.Transform,
})
}
diff --git a/table/transaction.go b/table/transaction.go
index 6379089..bc43def 100644
--- a/table/transaction.go
+++ b/table/transaction.go
@@ -142,6 +142,10 @@ func (t *Transaction) SetProperties(props
iceberg.Properties) error {
return nil
}
+func (t *Transaction) UpdateSpec(caseSensitive bool) *UpdateSpec {
+ return NewUpdateSpec(t, caseSensitive)
+}
+
func (t *Transaction) AppendTable(ctx context.Context, tbl arrow.Table,
batchSize int64, snapshotProps iceberg.Properties) error {
rdr := array.NewTableReader(tbl, batchSize)
defer rdr.Release()
diff --git a/table/update_spec.go b/table/update_spec.go
new file mode 100644
index 0000000..df777d6
--- /dev/null
+++ b/table/update_spec.go
@@ -0,0 +1,401 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+ "errors"
+ "fmt"
+ "slices"
+
+ "github.com/apache/iceberg-go"
+)
+
+// UpdateSpec implements a builder for evolving a table's partition
specification.
+//
+// It accumulates a sequence of partition spec update operations (e.g.,
AddField, RemoveField, RenameField)
+// which are applied during BuildUpdates.
+//
+// Use the builder methods to chain operations, and call BuildUpdates to apply
them and produce the
+// final set of partition fields and update requirements, or call Commit to
apply the updates in the transaction.
+type UpdateSpec struct {
+ operations []updateSpecOp
+
+ txn *Transaction
+ nameToField map[string]iceberg.PartitionField
+ nameToAddedField map[string]iceberg.PartitionField
+ transformToField map[transformKey]iceberg.PartitionField
+ transformToAddedField map[transformKey]iceberg.PartitionField
+ renames map[string]string
+ addedTimeFields map[int]iceberg.PartitionField
+ caseSensitive bool
+ adds []iceberg.PartitionField
+ deletes map[int]bool
+ lastAssignedFieldId int
+}
+type updateSpecOp func() error
+
+type transformKey struct {
+ SourceId int
+ Transform string
+}
+
+func NewUpdateSpec(t *Transaction, caseSensitive bool) *UpdateSpec {
+ transformToField := make(map[transformKey]iceberg.PartitionField)
+ nameToField := make(map[string]iceberg.PartitionField)
+ partitionSpec := t.tbl.Metadata().PartitionSpec()
+ for partitionField := range partitionSpec.Fields() {
+ transformToField[transformKey{
+ SourceId: partitionField.SourceID,
+ Transform: partitionField.Transform.String(),
+ }] = partitionField
+ nameToField[partitionField.Name] = partitionField
+ }
+ lastAssignedFieldId := t.tbl.Metadata().LastPartitionSpecID()
+ if lastAssignedFieldId == nil {
+ v := iceberg.PartitionDataIDStart - 1
+ lastAssignedFieldId = &v
+ }
+
+ return &UpdateSpec{
+ txn: t,
+ nameToField: nameToField,
+ nameToAddedField: make(map[string]iceberg.PartitionField),
+ transformToField: transformToField,
+ transformToAddedField:
make(map[transformKey]iceberg.PartitionField),
+ renames: make(map[string]string),
+ addedTimeFields: make(map[int]iceberg.PartitionField),
+ caseSensitive: caseSensitive,
+ adds: make([]iceberg.PartitionField, 0),
+ deletes: make(map[int]bool),
+ lastAssignedFieldId: *lastAssignedFieldId,
+ }
+}
+
+func (us *UpdateSpec) AddField(sourceColName string, transform
iceberg.Transform, partitionFieldName string) *UpdateSpec {
+ us.operations = append(us.operations, us.addField(sourceColName,
transform, partitionFieldName))
+
+ return us
+}
+
+func (us *UpdateSpec) AddIdentity(sourceColName string) *UpdateSpec {
+ return us.AddField(sourceColName, iceberg.IdentityTransform{}, "")
+}
+
+func (us *UpdateSpec) RemoveField(name string) *UpdateSpec {
+ us.operations = append(us.operations, us.removeField(name))
+
+ return us
+}
+
+func (us *UpdateSpec) RenameField(name string, newName string) *UpdateSpec {
+ us.operations = append(us.operations, us.renameField(name, newName))
+
+ return us
+}
+
+func (us *UpdateSpec) BuildUpdates() ([]Update, []Requirement, error) {
+ for _, op := range us.operations {
+ if err := op(); err != nil {
+ return nil, nil, err
+ }
+ }
+
+ newSpec := us.Apply()
+ updates := make([]Update, 0)
+ requirements := make([]Requirement, 0)
+
+ if us.txn.tbl.Metadata().DefaultPartitionSpec() != newSpec.ID() {
+ if us.isNewPartitionSpec(newSpec.ID()) {
+ updates = append(updates,
NewAddPartitionSpecUpdate(&newSpec, false))
+ updates = append(updates, NewSetDefaultSpecUpdate(-1))
+ } else {
+ updates = append(updates,
NewSetDefaultSpecUpdate(newSpec.ID()))
+ }
+ requiredLastAssignedPartitionId :=
us.txn.tbl.Metadata().LastPartitionSpecID()
+ requirements = append(requirements,
AssertLastAssignedPartitionID(*requiredLastAssignedPartitionId))
+ }
+
+ return updates, requirements, nil
+}
+
+func (us *UpdateSpec) Apply() iceberg.PartitionSpec {
+ partitionFields := make([]iceberg.PartitionField, 0)
+ partitionNames := make(map[string]bool)
+ spec := us.txn.tbl.Metadata().PartitionSpec()
+ for field := range spec.Fields() {
+ var newField iceberg.PartitionField
+ var err error
+ if _, deleted := us.deletes[field.FieldID]; !deleted {
+ if rename, renamed := us.renames[field.Name]; renamed {
+ newField, err =
us.addNewField(us.txn.tbl.Schema(), field.SourceID, field.FieldID, rename,
field.Transform, partitionNames)
+ } else {
+ newField, err =
us.addNewField(us.txn.tbl.Schema(), field.SourceID, field.FieldID, field.Name,
field.Transform, partitionNames)
+ }
+ if err != nil {
+ return iceberg.PartitionSpec{}
+ }
+ partitionFields = append(partitionFields, newField)
+ } else if us.txn.tbl.Metadata().Version() == 1 {
+ if rename, renamed := us.renames[field.Name]; renamed {
+ newField, err =
us.addNewField(us.txn.tbl.Schema(), field.SourceID, field.FieldID, rename,
iceberg.VoidTransform{}, partitionNames)
+ } else {
+ newField, err =
us.addNewField(us.txn.tbl.Schema(), field.SourceID, field.FieldID, field.Name,
iceberg.VoidTransform{}, partitionNames)
+ }
+ if err != nil {
+ return iceberg.PartitionSpec{}
+ }
+ partitionFields = append(partitionFields, newField)
+ }
+ }
+
+ partitionFields = append(partitionFields, us.adds...)
+
+ newSpec := iceberg.NewPartitionSpec(partitionFields...)
+ newSpecId := iceberg.InitialPartitionSpecID
+ for _, spec = range us.txn.tbl.Metadata().PartitionSpecs() {
+ if newSpec.CompatibleWith(&spec) {
+ newSpecId = spec.ID()
+
+ break
+ } else if newSpecId <= spec.ID() {
+ newSpecId = spec.ID() + 1
+ }
+ }
+
+ return iceberg.NewPartitionSpecID(newSpecId, partitionFields...)
+}
+
+func (us *UpdateSpec) Commit() error {
+ updates, requirements, err := us.BuildUpdates()
+ if err != nil {
+ return err
+ }
+
+ if len(updates) == 0 {
+ return nil
+ }
+
+ return us.txn.apply(updates, requirements)
+}
+
+func (us *UpdateSpec) addField(sourceColName string, transform
iceberg.Transform, partitionFieldName string) updateSpecOp {
+ return func() error {
+ // Finds the column in the schema and binds it with case
sensitivity.
+ ref := iceberg.Reference(sourceColName)
+ boundTerm, err := ref.Bind(us.txn.tbl.Schema(),
us.caseSensitive)
+ if err != nil {
+ return err
+ }
+
+ // Validate the transform
+ outputType := boundTerm.Type()
+ if !transform.CanTransform(outputType) {
+ return fmt.Errorf("%s cannot transform %s values from
%s", transform.String(), outputType.String(), boundTerm.Ref().Field().Name)
+ }
+
+ // Check for duplicate partition on same source
+ key := transformKey{
+ SourceId: boundTerm.Ref().Field().ID,
+ Transform: transform.String(),
+ }
+ existingPartitionField, exists := us.transformToField[key]
+ if exists && us.isDuplicatePartition(transform,
existingPartitionField) {
+ return fmt.Errorf("duplicate partition field for %s=%v,
%v already exists", ref.String(), ref, existingPartitionField)
+ }
+
+ // Check if this transform was already added
+ added, exists := us.transformToAddedField[key]
+ if exists {
+ return fmt.Errorf("already added partition: %s ",
added.Name)
+ }
+
+ // Create the new partition field and Check for name collisions
+ // with existing fields
+ newField, err := us.partitionField(key, partitionFieldName)
+ if err != nil {
+ return err
+ }
+ if _, exists = us.nameToAddedField[newField.Name]; exists {
+ return fmt.Errorf("already added partition field with
name: %s", newField.Name)
+ }
+
+ // Handle special case for time transforms
+ if _, isTimeTransform :=
newField.Transform.(iceberg.TimeTransform); isTimeTransform {
+ if existingTimeField, exists :=
us.addedTimeFields[newField.SourceID]; exists {
+ return fmt.Errorf("cannot add time partition
field: %s conflicts with %s", newField.Name, existingTimeField.Name)
+ }
+ us.addedTimeFields[newField.SourceID] = newField
+ }
+ us.transformToAddedField[key] = newField
+
+ // If name matches an existing field, rename it if it's VOID
transform
+ existingPartitionField, exists = us.nameToField[newField.Name]
+ if _, inDelete := us.deletes[existingPartitionField.FieldID];
exists && !inDelete {
+ if _, isVoidTransform :=
existingPartitionField.Transform.(iceberg.VoidTransform); isVoidTransform {
+ if err := us.renameField(
+ existingPartitionField.Name,
+ fmt.Sprintf("%s_%d",
existingPartitionField.Name, existingPartitionField.FieldID),
+ )(); err != nil {
+ return err
+ }
+ } else {
+ return fmt.Errorf("cannot add duplicate
partition field name: %s", existingPartitionField.Name)
+ }
+ }
+
+ // Register the new field
+ us.nameToAddedField[newField.Name] = newField
+ us.adds = append(us.adds, newField)
+
+ return nil
+ }
+}
+
+func (us *UpdateSpec) removeField(name string) updateSpecOp {
+ return func() error {
+ if _, added := us.nameToAddedField[name]; added {
+ return fmt.Errorf("cannot remove newly added field %s",
name)
+ }
+ if _, renamed := us.renames[name]; renamed {
+ return fmt.Errorf("cannot rename and delete field %s",
name)
+ }
+ field, exists := us.nameToField[name]
+ if !exists {
+ return fmt.Errorf("cannot find partition field %s",
name)
+ }
+ us.deletes[field.FieldID] = true
+
+ return nil
+ }
+}
+
+func (us *UpdateSpec) renameField(name string, newName string) updateSpecOp {
+ return func() error {
+ existingField, exists := us.nameToField[newName]
+ _, isVoidTransform :=
existingField.Transform.(iceberg.VoidTransform)
+ if exists && isVoidTransform {
+ return us.renameField(
+ name,
+ fmt.Sprintf("%s_%d", name,
existingField.FieldID),
+ )()
+ }
+ if _, added := us.nameToAddedField[name]; added {
+ return errors.New("cannot rename recently added
partitions")
+ }
+
+ field, exists := us.nameToField[name]
+ if !exists {
+ return fmt.Errorf("cannot find partition field: %s",
name)
+ }
+ if _, deleted := us.deletes[field.FieldID]; deleted {
+ return fmt.Errorf("cannot delete and rename partition
field: %s", name)
+ }
+ us.renames[name] = newName
+
+ return nil
+ }
+}
+
+func (us *UpdateSpec) partitionField(key transformKey, name string)
(iceberg.PartitionField, error) {
+ if us.txn.tbl.Metadata().Version() == 2 {
+ sourceId, transform := key.SourceId, key.Transform
+ historicalFields := make([]iceberg.PartitionField, 0)
+ for _, spec := range us.txn.tbl.Metadata().PartitionSpecs() {
+ historicalFields = slices.AppendSeq(historicalFields,
spec.Fields())
+ }
+ for _, field := range historicalFields {
+ if field.SourceID == sourceId &&
field.Transform.String() == transform {
+ if len(name) > 0 && field.Name == name {
+ return iceberg.PartitionField{
+ SourceID: sourceId,
+ FieldID: field.FieldID,
+ Name: name,
+ Transform: field.Transform,
+ }, nil
+ }
+ }
+ }
+ }
+ newFieldId := us.newFieldId()
+ transform, _ := iceberg.ParseTransform(key.Transform)
+ if name == "" {
+ tmpField := iceberg.PartitionField{
+ SourceID: key.SourceId,
+ FieldID: newFieldId,
+ Name: "",
+ Transform: transform,
+ }
+ var err error
+ name, err =
iceberg.GeneratePartitionFieldName(us.txn.tbl.Schema(), tmpField)
+ if err != nil {
+ return iceberg.PartitionField{}, err
+ }
+ }
+
+ return iceberg.PartitionField{
+ SourceID: key.SourceId,
+ FieldID: newFieldId,
+ Name: name,
+ Transform: transform,
+ }, nil
+}
+
+func (us *UpdateSpec) newFieldId() int {
+ us.lastAssignedFieldId += 1
+
+ return us.lastAssignedFieldId
+}
+
+func (us *UpdateSpec) isDuplicatePartition(transform iceberg.Transform,
partitionField iceberg.PartitionField) bool {
+ _, deleted := us.deletes[partitionField.FieldID]
+
+ return !deleted && transform.Equals(partitionField.Transform)
+}
+
+func (us *UpdateSpec) checkAndAddPartitionName(schema *iceberg.Schema, name
string, sourceId int, partitionNames map[string]bool) error {
+ field, found := schema.FindFieldByName(name)
+ if found && field.ID != sourceId {
+ return fmt.Errorf("cannot create partition from name that
exists in schema %s", name)
+ }
+ if _, exists := partitionNames[name]; exists {
+ return fmt.Errorf("partition name has to be unique: %s", name)
+ }
+ partitionNames[name] = true
+
+ return nil
+}
+
+func (us *UpdateSpec) addNewField(schema *iceberg.Schema, sourceId int,
fieldId int, name string, transform iceberg.Transform, partitionNames
map[string]bool) (iceberg.PartitionField, error) {
+ err := us.checkAndAddPartitionName(schema, name, sourceId,
partitionNames)
+ if err != nil {
+ return iceberg.PartitionField{}, err
+ }
+
+ return iceberg.PartitionField{
+ SourceID: sourceId,
+ FieldID: fieldId,
+ Name: name,
+ Transform: transform,
+ }, nil
+}
+
+func (us *UpdateSpec) isNewPartitionSpec(newSpecId int) bool {
+ return !slices.ContainsFunc(us.txn.tbl.Metadata().PartitionSpecs(),
func(s iceberg.PartitionSpec) bool {
+ return s.ID() == newSpecId
+ })
+}
diff --git a/table/update_spec_test.go b/table/update_spec_test.go
new file mode 100644
index 0000000..0fc102a
--- /dev/null
+++ b/table/update_spec_test.go
@@ -0,0 +1,491 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table_test
+
+import (
+ "testing"
+
+ "github.com/apache/iceberg-go"
+ "github.com/apache/iceberg-go/table"
+ "github.com/stretchr/testify/assert"
+)
+
+var testSchema = iceberg.NewSchema(1,
+ iceberg.NestedField{ID: 1, Name: "id", Required: true, Type:
iceberg.PrimitiveTypes.Int64},
+ iceberg.NestedField{ID: 2, Name: "name", Required: true, Type:
iceberg.PrimitiveTypes.String},
+ iceberg.NestedField{ID: 3, Name: "ts", Required: false, Type:
iceberg.PrimitiveTypes.Timestamp},
+ iceberg.NestedField{ID: 4, Name: "address", Required: false, Type:
&iceberg.StructType{
+ FieldList: []iceberg.NestedField{
+ {ID: 5, Name: "street", Type:
iceberg.PrimitiveTypes.String, Required: true},
+ {ID: 6, Name: "city", Type:
iceberg.PrimitiveTypes.String, Required: true},
+ {ID: 7, Name: "zip_code", Type:
iceberg.PrimitiveTypes.Int64, Required: false},
+ },
+ }},
+)
+
+var partitionSpec = iceberg.NewPartitionSpec(
+ iceberg.PartitionField{
+ SourceID: 1,
+ FieldID: iceberg.PartitionDataIDStart,
+ Name: "id_identity",
+ Transform: iceberg.IdentityTransform{},
+ },
+ iceberg.PartitionField{
+ SourceID: 5,
+ FieldID: iceberg.PartitionDataIDStart + 1,
+ Name: "street_void",
+ Transform: iceberg.VoidTransform{},
+ })
+
+var testMetadataNonPartitioned, _ = table.NewMetadata(testSchema,
iceberg.UnpartitionedSpec, table.UnsortedSortOrder, "", nil)
+
+var testMetadataPartitioned, _ = table.NewMetadata(testSchema, &partitionSpec,
table.UnsortedSortOrder, "", nil)
+
+var testNonPartitionedTable = table.New([]string{"non_partitioned"},
testMetadataNonPartitioned, "", nil, nil)
+
+var testPartitionedTable = table.New([]string{"partitioned"},
testMetadataPartitioned, "", nil, nil)
+
+func TestUpdateSpecAddField(t *testing.T) {
+ var txn *table.Transaction
+
+ t.Run("add partition fields", func(t *testing.T) {
+ txn = testPartitionedTable.NewTransaction()
+ specUpdate := table.NewUpdateSpec(txn, false)
+
+ updates, reqs, err := specUpdate.
+ AddField("ts", iceberg.YearTransform{},
"year_transform").
+ AddField("address.Zip_cOdE",
iceberg.BucketTransform{NumBuckets: 5}, "zipcode_bucket").
+ BuildUpdates()
+ assert.NoError(t, err)
+ assert.NotNil(t, updates)
+ assert.NotNil(t, reqs)
+
+ newSpec := specUpdate.Apply()
+ assert.NotNil(t, newSpec)
+ assert.Equal(t, 1, newSpec.ID())
+ assert.Equal(t, 1003, newSpec.LastAssignedFieldID())
+ assert.Equal(t, 4, newSpec.NumFields())
+ assert.Equal(t, "id_identity",
newSpec.FieldsBySourceID(1)[0].Name)
+ assert.Equal(t, "street_void",
newSpec.FieldsBySourceID(5)[0].Name)
+
+ addedField := newSpec.FieldsBySourceID(3)[0]
+ assert.Equal(t, 3, addedField.SourceID)
+ assert.Equal(t, 1002, addedField.FieldID)
+ assert.Equal(t, "year_transform", addedField.Name)
+ assert.Equal(t, iceberg.YearTransform{}, addedField.Transform)
+
+ addedField = newSpec.FieldsBySourceID(7)[0]
+ assert.Equal(t, 7, addedField.SourceID)
+ assert.Equal(t, 1003, addedField.FieldID)
+ assert.Equal(t, "zipcode_bucket", addedField.Name)
+ assert.Equal(t, iceberg.BucketTransform{NumBuckets: 5},
addedField.Transform)
+ })
+
+ t.Run("add partition field case sensitive", func(t *testing.T) {
+ txn = testNonPartitionedTable.NewTransaction()
+ updates := table.NewUpdateSpec(txn, true)
+ _, _, err := updates.
+ AddField("NaMe", iceberg.VoidTransform{}, "name_void").
+ BuildUpdates()
+ assert.Error(t, err)
+ assert.ErrorContains(t, err, "invalid schema: could not bind
reference")
+ })
+
+ t.Run("add invalid partition transform field", func(t *testing.T) {
+ txn = testNonPartitionedTable.NewTransaction()
+ specUpdate := table.NewUpdateSpec(txn, true)
+ updates, reqs, err := specUpdate.
+ AddField("name", iceberg.YearTransform{}, "name_year").
+ BuildUpdates()
+ assert.Error(t, err)
+ assert.ErrorContains(t, err, "year cannot transform string
values from name")
+ assert.Nil(t, updates)
+ assert.Nil(t, reqs)
+ })
+
+ t.Run("add duplicate partition field", func(t *testing.T) {
+ txn = testPartitionedTable.NewTransaction()
+ specUpdate := table.NewUpdateSpec(txn, true)
+ updates, reqs, err := specUpdate.
+ AddField("id", iceberg.IdentityTransform{},
"id_transform").
+ BuildUpdates()
+ assert.Error(t, err)
+ assert.ErrorContains(t, err, "duplicate partition field")
+ assert.Nil(t, updates)
+ assert.Nil(t, reqs)
+ })
+
+ t.Run("add already added partition field", func(t *testing.T) {
+ txn = testPartitionedTable.NewTransaction()
+ updateSpec := table.NewUpdateSpec(txn, true)
+
+ updates, reqs, err := updateSpec.
+ AddField("ts", iceberg.YearTransform{},
"year_transform_1").
+ AddField("ts", iceberg.YearTransform{},
"year_transform_2").
+ BuildUpdates()
+ assert.Error(t, err)
+ assert.ErrorContains(t, err, "already added partition")
+ assert.Nil(t, updates)
+ assert.Nil(t, reqs)
+ })
+
+ t.Run("add duplicate partition field name", func(t *testing.T) {
+ txn = testPartitionedTable.NewTransaction()
+ specUpdate := table.NewUpdateSpec(txn, true)
+
+ updates, reqs, err := specUpdate.
+ AddField("ts", iceberg.YearTransform{},
"year_transform_1").
+ AddField("ts", iceberg.MonthTransform{},
"year_transform_1").
+ BuildUpdates()
+
+ assert.Error(t, err)
+ assert.ErrorContains(t, err, "already added partition field
with name")
+ assert.Nil(t, updates)
+ assert.Nil(t, reqs)
+ })
+
+ t.Run("add conflicted time transform partition field", func(t
*testing.T) {
+ txn = testNonPartitionedTable.NewTransaction()
+ updateSpec := table.NewUpdateSpec(txn, true)
+
+ updates, reqs, err := updateSpec.
+ AddField("ts", iceberg.YearTransform{}, "ts_year").
+ AddField("ts", iceberg.MonthTransform{}, "ts_month").
+ BuildUpdates()
+
+ assert.Error(t, err)
+ assert.ErrorContains(t, err, "cannot add time partition field")
+ assert.Nil(t, updates)
+ assert.Nil(t, reqs)
+ })
+
+ t.Run("add duplicate partition field", func(t *testing.T) {
+ txn = testPartitionedTable.NewTransaction()
+ updateSpec := table.NewUpdateSpec(txn, true)
+
+ updates, reqs, err := updateSpec.
+ AddField("ts", iceberg.YearTransform{}, "id_identity").
+ BuildUpdates()
+ assert.Error(t, err)
+ assert.ErrorContains(t, err, "cannot add duplicate partition
field name")
+ assert.Nil(t, updates)
+ assert.Nil(t, reqs)
+ })
+
+ t.Run("add duplicate partition field name with void transform", func(t
*testing.T) {
+ txn = testPartitionedTable.NewTransaction()
+ specUpdate := table.NewUpdateSpec(txn, true)
+
+ updates, reqs, err := specUpdate.
+ AddField("ts", iceberg.VoidTransform{}, "street_void").
+ BuildUpdates()
+ assert.NoError(t, err)
+ assert.NotNil(t, updates)
+ assert.NotNil(t, reqs)
+
+ newSpec := specUpdate.Apply()
+ assert.NotNil(t, newSpec)
+ assert.Equal(t, "street_void_1001",
newSpec.FieldsBySourceID(5)[0].Name)
+ })
+}
+
+func TestUpdateSpecAddIdentityField(t *testing.T) {
+ var txn *table.Transaction
+
+ t.Run("add identity partition fields", func(t *testing.T) {
+ txn = testPartitionedTable.NewTransaction()
+ specUpdate := table.NewUpdateSpec(txn, false)
+ updates, reqs, err := specUpdate.
+ AddIdentity("ts").
+ AddIdentity("name").
+ BuildUpdates()
+ assert.NoError(t, err)
+ assert.NotNil(t, updates)
+ assert.NotNil(t, reqs)
+
+ newSpec := specUpdate.Apply()
+ assert.NotNil(t, newSpec)
+ assert.Equal(t, 1, newSpec.ID())
+ assert.Equal(t, 1003, newSpec.LastAssignedFieldID())
+ assert.Equal(t, 4, newSpec.NumFields())
+ assert.Equal(t, "id_identity",
newSpec.FieldsBySourceID(1)[0].Name)
+ assert.Equal(t, "street_void",
newSpec.FieldsBySourceID(5)[0].Name)
+
+ addedField := newSpec.FieldsBySourceID(3)[0]
+ assert.Equal(t, 3, addedField.SourceID)
+ assert.Equal(t, 1002, addedField.FieldID)
+ assert.Equal(t, "ts", addedField.Name)
+ assert.Equal(t, iceberg.IdentityTransform{},
addedField.Transform)
+
+ addedField = newSpec.FieldsBySourceID(2)[0]
+ assert.Equal(t, 2, addedField.SourceID)
+ assert.Equal(t, 1003, addedField.FieldID)
+ assert.Equal(t, "name", addedField.Name)
+ assert.Equal(t, iceberg.IdentityTransform{},
addedField.Transform)
+ })
+}
+
+func TestUpdateSpecRenameField(t *testing.T) {
+ var txn *table.Transaction
+
+ t.Run("rename partition fields", func(t *testing.T) {
+ txn = testPartitionedTable.NewTransaction()
+ updateSpec := table.NewUpdateSpec(txn, false)
+ updates, reqs, err := updateSpec.
+ RenameField("id_identity", "new_id_identity").
+ RenameField("street_void", "new_street_void").
+ BuildUpdates()
+ assert.NoError(t, err)
+ assert.NotNil(t, updates)
+ assert.NotNil(t, reqs)
+
+ newSpec := updateSpec.Apply()
+ assert.NotNil(t, newSpec)
+ assert.Equal(t, 1, newSpec.ID())
+ assert.Equal(t, "new_id_identity",
newSpec.FieldsBySourceID(1)[0].Name)
+ assert.Equal(t, "new_street_void",
newSpec.FieldsBySourceID(5)[0].Name)
+ })
+
+ t.Run("rename recently added partition", func(t *testing.T) {
+ txn = testPartitionedTable.NewTransaction()
+ updateSpec := table.NewUpdateSpec(txn, false)
+
+ updates, reqs, err := updateSpec.
+ AddField("ts", iceberg.YearTransform{},
"year_transform").
+ RenameField("year_transform", "new_year_transform").
+ BuildUpdates()
+ assert.Error(t, err)
+ assert.ErrorContains(t, err, "cannot rename recently added
partitions")
+ assert.Nil(t, updates)
+ assert.Nil(t, reqs)
+ })
+
+ t.Run("rename a partition field that doesn't exist", func(t *testing.T)
{
+ txn = testPartitionedTable.NewTransaction()
+ updateSpec := table.NewUpdateSpec(txn, false)
+
+ updates, reqs, err := updateSpec.
+ RenameField("non_exist_field", "new_non_exist_field").
+ BuildUpdates()
+ assert.Error(t, err)
+ assert.ErrorContains(t, err, "cannot find partition field")
+ assert.Nil(t, updates)
+ assert.Nil(t, reqs)
+ })
+
+ t.Run("rename a partition field deleted in the same transaction",
func(t *testing.T) {
+ txn = testPartitionedTable.NewTransaction()
+ updateSpec := table.NewUpdateSpec(txn, false)
+
+ updates, reqs, err := updateSpec.
+ RemoveField("id_identity").
+ RenameField("id_identity", "new_id_identity").
+ BuildUpdates()
+ assert.Error(t, err)
+ assert.ErrorContains(t, err, "cannot delete and rename
partition field")
+ assert.Nil(t, updates)
+ assert.Nil(t, reqs)
+ })
+}
+
+func TestUpdateSpecRemoveField(t *testing.T) {
+ var txn *table.Transaction
+
+ t.Run("remove existing partition fields", func(t *testing.T) {
+ txn = testPartitionedTable.NewTransaction()
+ updateSpec := table.NewUpdateSpec(txn, false)
+
+ updates, reqs, err := updateSpec.
+ RemoveField("street_void").
+ RemoveField("id_identity").
+ BuildUpdates()
+ assert.NoError(t, err)
+ assert.NotNil(t, updates)
+ assert.NotNil(t, reqs)
+
+ newSpec := updateSpec.Apply()
+ assert.NotNil(t, newSpec)
+ assert.Equal(t, 1, newSpec.ID())
+ assert.Equal(t, 999, newSpec.LastAssignedFieldID())
+ assert.Equal(t, 0, newSpec.NumFields())
+ assert.Equal(t, true, newSpec.IsUnpartitioned())
+ })
+
+ t.Run("remove newly added partition field", func(t *testing.T) {
+ txn = testPartitionedTable.NewTransaction()
+ updateSpec := table.NewUpdateSpec(txn, false)
+
+ updates, reqs, err := updateSpec.
+ AddField("ts", iceberg.YearTransform{},
"year_transform").
+ RemoveField("year_transform").
+ BuildUpdates()
+ assert.Error(t, err)
+ assert.ErrorContains(t, err, "cannot remove newly added field")
+ assert.Nil(t, updates)
+ assert.Nil(t, reqs)
+ })
+
+ t.Run("remove renamed partition field", func(t *testing.T) {
+ txn = testPartitionedTable.NewTransaction()
+ updateSpec := table.NewUpdateSpec(txn, false)
+
+ updates, reqs, err := updateSpec.
+ RenameField("id_identity", "new_id_identity").
+ RemoveField("id_identity").
+ BuildUpdates()
+ assert.Error(t, err)
+ assert.ErrorContains(t, err, "cannot rename and delete field")
+ assert.Nil(t, updates)
+ assert.Nil(t, reqs)
+ })
+
+ t.Run("remove partition field that doesn't exist", func(t *testing.T) {
+ txn = testPartitionedTable.NewTransaction()
+ updateSpec := table.NewUpdateSpec(txn, false)
+
+ updates, reqs, err := updateSpec.
+ RemoveField("non_exist_field").
+ BuildUpdates()
+ assert.Error(t, err)
+ assert.ErrorContains(t, err, "cannot find partition field")
+ assert.Nil(t, updates)
+ assert.Nil(t, reqs)
+ })
+}
+
+func TestUpdateSpecBuildChanges(t *testing.T) {
+ var txn *table.Transaction
+
+ t.Run("build changes on added partition fields", func(t *testing.T) {
+ txn = testNonPartitionedTable.NewTransaction()
+ specUpdate := table.NewUpdateSpec(txn, false)
+
+ updates, reqs, err := specUpdate.
+ AddField("ts", iceberg.YearTransform{},
"year_transform").
+ AddField("address.zip_code",
iceberg.BucketTransform{NumBuckets: 5}, "zipcode_bucket").
+ BuildUpdates()
+ assert.NoError(t, err)
+ assert.NotNil(t, updates)
+ assert.NotNil(t, reqs)
+
+ assert.Equal(t, 2, len(updates))
+ assert.Equal(t, 1, len(reqs))
+
+ assert.Equal(t, table.UpdateAddSpec, updates[0].Action())
+ assert.Equal(t, table.UpdateSetDefaultSpec, updates[1].Action())
+ assert.Equal(t, "assert-last-assigned-partition-id",
reqs[0].GetType())
+ })
+
+ t.Run("build changes on removed partition field", func(t *testing.T) {
+ txn = testPartitionedTable.NewTransaction()
+ specUpdate := table.NewUpdateSpec(txn, false)
+
+ updates, reqs, err := specUpdate.
+ RemoveField("street_void").
+ BuildUpdates()
+ assert.NoError(t, err)
+ assert.NotNil(t, updates)
+ assert.NotNil(t, reqs)
+
+ assert.Equal(t, 2, len(updates))
+ assert.Equal(t, 1, len(reqs))
+
+ assert.Equal(t, table.UpdateAddSpec, updates[0].Action())
+ assert.Equal(t, table.UpdateSetDefaultSpec, updates[1].Action())
+ assert.Equal(t, "assert-last-assigned-partition-id",
reqs[0].GetType())
+ })
+
+ t.Run("build changes on renamed partition field", func(t *testing.T) {
+ txn = testPartitionedTable.NewTransaction()
+ specUpdate := table.NewUpdateSpec(txn, false)
+
+ updates, reqs, err := specUpdate.
+ RenameField("street_void", "new_street_void").
+ BuildUpdates()
+
+ assert.NoError(t, err)
+ assert.Equal(t, 2, len(updates))
+ assert.Equal(t, 1, len(reqs))
+
+ assert.Equal(t, table.UpdateAddSpec, updates[0].Action())
+ assert.Equal(t, table.UpdateSetDefaultSpec, updates[1].Action())
+ assert.Equal(t, "assert-last-assigned-partition-id",
reqs[0].GetType())
+ })
+}
+
+func TestUpdateSpecCommit(t *testing.T) {
+ var txn *table.Transaction
+
+ t.Run("test commit apply changes on transaction", func(t *testing.T) {
+ txn = testPartitionedTable.NewTransaction()
+
+ specUpdate := table.NewUpdateSpec(txn, false)
+
+ err := specUpdate.
+ AddField("address.city",
iceberg.TruncateTransform{Width: 3}, "").
+ AddIdentity("address.zip_code").
+ RenameField("street_void", "new_street_void").
+ RemoveField("id_identity").
+ Commit()
+ assert.NoError(t, err)
+
+ stagedTbl, err := txn.StagedTable()
+ assert.NoError(t, err)
+
+ currSpec := stagedTbl.Spec()
+ assert.NotNil(t, currSpec)
+ assert.Equal(t, 1, currSpec.ID())
+ assert.Equal(t, 1003, currSpec.LastAssignedFieldID())
+ assert.Equal(t, 3, currSpec.NumFields())
+ assert.Equal(t, "new_street_void",
currSpec.FieldsBySourceID(5)[0].Name)
+ assert.Equal(t, []iceberg.PartitionField(nil),
currSpec.FieldsBySourceID(1))
+
+ addedField := currSpec.FieldsBySourceID(6)[0]
+ assert.Equal(t, 6, addedField.SourceID)
+ assert.Equal(t, 1002, addedField.FieldID)
+ assert.Equal(t, "address.city_trunc_3", addedField.Name)
+ assert.Equal(t, iceberg.TruncateTransform{Width: 3},
addedField.Transform)
+
+ addedIdentity := currSpec.FieldsBySourceID(7)[0]
+ assert.Equal(t, 7, addedIdentity.SourceID)
+ assert.Equal(t, 1003, addedIdentity.FieldID)
+ assert.Equal(t, "address.zip_code", addedIdentity.Name)
+ assert.Equal(t, iceberg.IdentityTransform{},
addedIdentity.Transform)
+ })
+
+ t.Run("test commit with build errors", func(t *testing.T) {
+ txn = testPartitionedTable.NewTransaction()
+
+ specUpdate := table.NewUpdateSpec(txn, false)
+
+ err := specUpdate.
+ AddField("id", iceberg.IdentityTransform{},
"id_transform").
+ Commit()
+ assert.Error(t, err)
+ assert.ErrorContains(t, err, "duplicate partition field")
+ })
+
+ t.Run("test commit with empty updates", func(t *testing.T) {
+ txn = testPartitionedTable.NewTransaction()
+
+ specUpdate := table.NewUpdateSpec(txn, false)
+
+ err := specUpdate.Commit()
+ assert.Nil(t, err)
+ })
+}
diff --git a/transforms.go b/transforms.go
index f59b3f8..8c3e0aa 100644
--- a/transforms.go
+++ b/transforms.go
@@ -550,12 +550,12 @@ func (t TruncateTransform) Project(name string, pred
BoundPredicate) (UnboundPre
var epochTM = time.Unix(0, 0).UTC()
-type timeTransform interface {
+type TimeTransform interface {
Transform
Transformer(Type) (func(any) Optional[int32], error)
}
-func canTransformTime(t timeTransform, sourceType Type) bool {
+func canTransformTime(t TimeTransform, sourceType Type) bool {
switch sourceType.(type) {
case DateType, TimestampType, TimestampTzType:
return true
@@ -564,7 +564,7 @@ func canTransformTime(t timeTransform, sourceType Type)
bool {
}
}
-func projectTimeTransform(t timeTransform, name string, pred BoundPredicate)
(UnboundPredicate, error) {
+func projectTimeTransform(t TimeTransform, name string, pred BoundPredicate)
(UnboundPredicate, error) {
if _, ok := pred.Term().(*BoundTransform); ok {
return projectTransformPredicate(t, name, pred)
}