This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new b511736  feat: reuse schemas & increment their IDS (#495)
b511736 is described below

commit b511736688dbc5e323754bc4462c7d992e01c71a
Author: Tobias Pütz <tob...@minio.io>
AuthorDate: Tue Aug 5 22:53:28 2025 +0200

    feat: reuse schemas & increment their IDS (#495)
    
    Aligns us a bit more with the Java & Rust implementations where
    functionally equivalent schemas don't get added a second time. It also
    aligns renumbering of schemas to be equivalent to java/rust.
    
    Java source:
    
https://github.com/apache/iceberg/blob/41c0b17a20c522e4df519bcc429f413e6a2855e5/core/src/main/java/org/apache/iceberg/TableMetadata.java#L117-L138
    Rust source:
    
https://github.com/apache/iceberg-rust/blob/c4c006939a3cb3bf684a0d10ba0e66cac484befe/crates/iceberg/src/spec/table_metadata_builder.rs#L1068
---
 catalog/glue/glue_test.go       |  6 +---
 table/arrow_utils.go            | 16 +++++++--
 table/metadata.go               | 78 ++++++++++++++++++++++++++++-------------
 table/metadata_internal_test.go | 71 +++++++++++++++++++++++++++++++++++++
 table/snapshot_producers.go     | 14 +++++---
 table/updates.go                | 19 ++++------
 table/updates_test.go           |  9 ++---
 7 files changed, 159 insertions(+), 54 deletions(-)

diff --git a/catalog/glue/glue_test.go b/catalog/glue/glue_test.go
index d56d4ef..33d0dda 100644
--- a/catalog/glue/glue_test.go
+++ b/catalog/glue/glue_test.go
@@ -1148,11 +1148,7 @@ func TestAlterTableIntegration(t *testing.T) {
        }
        newFields := append(currentSchema.Fields(), addField) // add column 
'new_col'
        newFields = append(newFields[:1], newFields[2:]...)   // drop column 
'bar'
-       updateColumns := table.NewAddSchemaUpdate(
-               iceberg.NewSchemaWithIdentifiers(newSchemaId, 
currentSchema.IdentifierFieldIDs, newFields...),
-               addField.ID,
-               false,
-       )
+       updateColumns := 
table.NewAddSchemaUpdate(iceberg.NewSchemaWithIdentifiers(newSchemaId, 
currentSchema.IdentifierFieldIDs, newFields...))
        setSchema := table.NewSetCurrentSchemaUpdate(newSchemaId)
 
        _, _, err = ctlg.CommitTable(
diff --git a/table/arrow_utils.go b/table/arrow_utils.go
index 8085698..60a466a 100644
--- a/table/arrow_utils.go
+++ b/table/arrow_utils.go
@@ -1190,7 +1190,14 @@ func filesToDataFiles(ctx context.Context, fileIO 
iceio.IO, meta *MetadataBuilde
                        }
                }()
 
-               currentSchema, currentSpec := meta.CurrentSchema(), 
meta.CurrentSpec()
+               partitionSpec, err := meta.CurrentSpec()
+               if err != nil || partitionSpec == nil {
+                       yield(nil, fmt.Errorf("%w: cannot add files without a 
current spec", err))
+
+                       return
+               }
+
+               currentSchema, currentSpec := meta.CurrentSchema(), 
*partitionSpec
 
                for filePath := range paths {
                        format := tblutils.FormatFromFileName(filePath)
@@ -1287,9 +1294,12 @@ func recordsToDataFiles(ctx context.Context, 
rootLocation string, meta *Metadata
        if err != nil {
                panic(err)
        }
-
+       currentSpec, err := meta.CurrentSpec()
+       if err != nil || currentSpec == nil {
+               panic(fmt.Errorf("%w: cannot write files without a current 
spec", err))
+       }
        nextCount, stopCount := iter.Pull(args.counter)
-       if meta.CurrentSpec().IsUnpartitioned() {
+       if currentSpec.IsUnpartitioned() {
                tasks := func(yield func(WriteTask) bool) {
                        defer stopCount()
 
diff --git a/table/metadata.go b/table/metadata.go
index 6744d5f..d48cb14 100644
--- a/table/metadata.go
+++ b/table/metadata.go
@@ -162,6 +162,8 @@ type MetadataBuilder struct {
 
        // >v1 specific
        lastSequenceNumber *int64
+       // update tracking
+       lastAddedSchemaID *int
 }
 
 func NewMetadataBuilder() (*MetadataBuilder, error) {
@@ -190,7 +192,8 @@ func MetadataBuilderFromBase(metadata Metadata) 
(*MetadataBuilder, error) {
        b.schemaList = slices.Clone(metadata.Schemas())
        b.currentSchemaID = metadata.CurrentSchema().ID
        b.specs = slices.Clone(metadata.PartitionSpecs())
-       b.defaultSpecID = metadata.DefaultPartitionSpec()
+       defaultSpecID := metadata.DefaultPartitionSpec()
+       b.defaultSpecID = defaultSpecID
        b.lastPartitionID = metadata.LastPartitionSpecID()
        b.props = maps.Clone(metadata.Properties())
        b.snapshotList = slices.Clone(metadata.Snapshots())
@@ -214,8 +217,8 @@ func MetadataBuilderFromBase(metadata Metadata) 
(*MetadataBuilder, error) {
 
 func (b *MetadataBuilder) HasChanges() bool { return len(b.updates) > 0 }
 
-func (b *MetadataBuilder) CurrentSpec() iceberg.PartitionSpec {
-       return b.specs[b.defaultSpecID]
+func (b *MetadataBuilder) CurrentSpec() (*iceberg.PartitionSpec, error) {
+       return b.GetSpecByID(b.defaultSpecID)
 }
 
 func (b *MetadataBuilder) CurrentSchema() *iceberg.Schema {
@@ -257,21 +260,30 @@ func (b *MetadataBuilder) currentSnapshot() *Snapshot {
        return s
 }
 
-func (b *MetadataBuilder) AddSchema(schema *iceberg.Schema, newLastColumnID 
int, initial bool) (*MetadataBuilder, error) {
+func (b *MetadataBuilder) AddSchema(schema *iceberg.Schema) (*MetadataBuilder, 
error) {
+       newLastColumnID := schema.HighestFieldID()
        if newLastColumnID < b.lastColumnId {
                return nil, fmt.Errorf("%w: newLastColumnID %d, must be >= %d", 
iceberg.ErrInvalidArgument, newLastColumnID, b.lastColumnId)
        }
 
-       var schemas []*iceberg.Schema
-       if initial {
-               schemas = []*iceberg.Schema{schema}
-       } else {
-               schemas = append(b.schemaList, schema)
+       newSchemaID := b.reuseOrCreateNewSchemaID(schema)
+
+       if _, err := b.GetSchemaByID(newSchemaID); err == nil {
+               if b.lastAddedSchemaID == nil || *b.lastAddedSchemaID != 
newSchemaID {
+                       b.updates = append(b.updates, 
NewAddSchemaUpdate(schema))
+                       b.lastAddedSchemaID = &newSchemaID
+               }
+
+               return b, nil
        }
 
-       b.lastColumnId = newLastColumnID
-       b.schemaList = schemas
-       b.updates = append(b.updates, NewAddSchemaUpdate(schema, 
newLastColumnID, initial))
+       b.lastColumnId = max(b.lastColumnId, schema.HighestFieldID())
+
+       schema.ID = newSchemaID
+
+       b.schemaList = append(b.schemaList, schema)
+       b.updates = append(b.updates, NewAddSchemaUpdate(schema))
+       b.lastAddedSchemaID = &newSchemaID
 
        return b, nil
 }
@@ -384,14 +396,10 @@ func (b *MetadataBuilder) RemoveProperties(keys []string) 
(*MetadataBuilder, err
 
 func (b *MetadataBuilder) SetCurrentSchemaID(currentSchemaID int) 
(*MetadataBuilder, error) {
        if currentSchemaID == -1 {
-               currentSchemaID = maxBy(b.schemaList, func(s *iceberg.Schema) 
int {
-                       return s.ID
-               })
-               if !slices.ContainsFunc(b.updates, func(u Update) bool {
-                       return u.Action() == UpdateAddSchema && 
u.(*addSchemaUpdate).Schema.ID == currentSchemaID
-               }) {
+               if b.lastAddedSchemaID == nil {
                        return nil, errors.New("can't set current schema to 
last added schema, no schema has been added")
                }
+               currentSchemaID = *b.lastAddedSchemaID
        }
 
        if currentSchemaID == b.currentSchemaID {
@@ -641,7 +649,12 @@ func (b *MetadataBuilder) SetLastUpdatedMS() 
*MetadataBuilder {
        return b
 }
 
-func (b *MetadataBuilder) buildCommonMetadata() *commonMetadata {
+func (b *MetadataBuilder) buildCommonMetadata() (*commonMetadata, error) {
+       if _, err := b.GetSpecByID(b.defaultSpecID); err != nil {
+               return nil, fmt.Errorf("defaultSpecID is invalid: %w", err)
+       }
+       defaultSpecID := b.defaultSpecID
+
        if b.lastUpdatedMS == 0 {
                b.lastUpdatedMS = time.Now().UnixMilli()
        }
@@ -655,7 +668,7 @@ func (b *MetadataBuilder) buildCommonMetadata() 
*commonMetadata {
                SchemaList:         b.schemaList,
                CurrentSchemaID:    b.currentSchemaID,
                Specs:              b.specs,
-               DefaultSpecID:      b.defaultSpecID,
+               DefaultSpecID:      defaultSpecID,
                LastPartitionID:    b.lastPartitionID,
                Props:              b.props,
                SnapshotList:       b.snapshotList,
@@ -665,7 +678,7 @@ func (b *MetadataBuilder) buildCommonMetadata() 
*commonMetadata {
                SortOrderList:      b.sortOrderList,
                DefaultSortOrderID: b.defaultSortOrderID,
                SnapshotRefs:       b.refs,
-       }
+       }, nil
 }
 
 func (b *MetadataBuilder) GetSchemaByID(id int) (*iceberg.Schema, error) {
@@ -736,7 +749,10 @@ func (b *MetadataBuilder) AppendMetadataLog(entry 
MetadataLogEntry) *MetadataBui
 }
 
 func (b *MetadataBuilder) Build() (Metadata, error) {
-       common := b.buildCommonMetadata()
+       common, err := b.buildCommonMetadata()
+       if err != nil {
+               return nil, err
+       }
        if err := common.validate(); err != nil {
                return nil, err
        }
@@ -748,7 +764,7 @@ func (b *MetadataBuilder) Build() (Metadata, error) {
                        return nil, fmt.Errorf("can't build metadata, missing 
schema for schema ID %d: %w", b.currentSchemaID, err)
                }
 
-               partition, err := b.GetSpecByID(b.defaultSpecID)
+               partition, err := b.GetSpecByID(common.DefaultSpecID)
                if err != nil {
                        return nil, fmt.Errorf("can't build metadata, missing 
partition spec for spec ID %d: %w", b.defaultSpecID, err)
                }
@@ -777,10 +793,24 @@ func (b *MetadataBuilder) Build() (Metadata, error) {
                }, nil
 
        default:
-               panic("unreachable: invalid format version")
+               return nil, fmt.Errorf("unknown format version %d", 
b.formatVersion)
        }
 }
 
+func (b *MetadataBuilder) reuseOrCreateNewSchemaID(newSchema *iceberg.Schema) 
int {
+       newSchemaID := newSchema.ID
+       for _, schema := range b.schemaList {
+               if newSchema.Equals(schema) {
+                       return schema.ID
+               }
+               if schema.ID >= newSchemaID {
+                       newSchemaID = schema.ID + 1
+               }
+       }
+
+       return newSchemaID
+}
+
 // maxBy returns the maximum value of extract(e) for all e in elems.
 // If elems is empty, returns 0.
 func maxBy[S ~[]E, E any](elems S, extract func(e E) int) int {
diff --git a/table/metadata_internal_test.go b/table/metadata_internal_test.go
index 723a407..6ca47a7 100644
--- a/table/metadata_internal_test.go
+++ b/table/metadata_internal_test.go
@@ -764,6 +764,77 @@ func TestMetadataBuilderSetDefaultSpecIDLastPartition(t 
*testing.T) {
        assert.Equal(t, 0, builder.defaultSpecID)
 }
 
+func TestMetadataBuilderSetLastAddedSchema(t *testing.T) {
+       builder, err := NewMetadataBuilder()
+       assert.NoError(t, err)
+       _, err = builder.SetFormatVersion(2)
+       assert.NoError(t, err)
+       schema := iceberg.NewSchema(1,
+               iceberg.NestedField{ID: 1, Name: "foo", Type: 
iceberg.StringType{}, Required: true},
+       )
+       _, err = builder.AddSchema(schema)
+       assert.NoError(t, err)
+       _, err = builder.SetCurrentSchemaID(-1)
+       assert.NoError(t, err)
+
+       partitionSpec := iceberg.NewPartitionSpecID(0)
+       _, err = builder.AddPartitionSpec(&partitionSpec, false)
+       assert.NoError(t, err)
+
+       _, err = builder.SetDefaultSpecID(-1)
+       assert.NoError(t, err)
+
+       meta, err := builder.Build()
+       assert.NoError(t, err)
+       assert.Equal(t, schema.ID, meta.CurrentSchema().ID)
+       assert.True(t, schema.Equals(meta.CurrentSchema()))
+}
+
+func TestMetadataBuilderSchemaIncreasingNumbering(t *testing.T) {
+       builder, err := NewMetadataBuilder()
+       assert.NoError(t, err)
+       _, err = builder.SetFormatVersion(2)
+       assert.NoError(t, err)
+       schema := iceberg.NewSchema(1,
+               iceberg.NestedField{ID: 1, Name: "foo", Type: 
iceberg.StringType{}, Required: true},
+       )
+       _, err = builder.AddSchema(schema)
+       assert.NoError(t, err)
+
+       schema = iceberg.NewSchema(3,
+               iceberg.NestedField{ID: 3, Name: "foo", Type: 
iceberg.StringType{}, Required: true},
+       )
+       _, err = builder.AddSchema(schema)
+       assert.NoError(t, err)
+
+       schema = iceberg.NewSchema(2,
+               iceberg.NestedField{ID: 4, Name: "foo", Type: 
iceberg.StringType{}, Required: true},
+       )
+       _, err = builder.AddSchema(schema)
+       assert.NoError(t, err)
+
+       assert.Equal(t, 1, builder.schemaList[0].ID)
+       assert.Equal(t, 3, builder.schemaList[1].ID)
+       assert.Equal(t, 4, builder.schemaList[2].ID)
+}
+
+func TestMetadataBuilderReuseSchema(t *testing.T) {
+       builder, err := NewMetadataBuilder()
+       assert.NoError(t, err)
+       schema := iceberg.NewSchema(1,
+               iceberg.NestedField{ID: 1, Name: "foo", Type: 
iceberg.StringType{}, Required: true},
+       )
+       _, err = builder.AddSchema(schema)
+       assert.NoError(t, err)
+       schema2 := iceberg.NewSchema(15,
+               iceberg.NestedField{ID: 1, Name: "foo", Type: 
iceberg.StringType{}, Required: true},
+       )
+       _, err = builder.AddSchema(schema2)
+       assert.NoError(t, err)
+       assert.Equal(t, len(builder.schemaList), 1)
+       assert.Equal(t, *builder.lastAddedSchemaID, 1)
+}
+
 func TestMetadataV1Validation(t *testing.T) {
        // Test case 1: JSON with no last-column-id field
        noColumnID := `{
diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go
index 562ef68..58721ed 100644
--- a/table/snapshot_producers.go
+++ b/table/snapshot_producers.go
@@ -525,9 +525,12 @@ func (sp *snapshotProducer) manifests() 
([]iceberg.ManifestFile, error) {
                        defer out.Close()
 
                        counter := &internal.CountingWriter{W: out}
-
+                       currentSpec, err := sp.txn.meta.CurrentSpec()
+                       if err != nil || currentSpec == nil {
+                               return fmt.Errorf("could not get current 
partition spec: %w", err)
+                       }
                        wr, err := 
iceberg.NewManifestWriter(sp.txn.meta.formatVersion, counter,
-                               sp.txn.meta.CurrentSpec(), 
sp.txn.meta.CurrentSchema(),
+                               *currentSpec, sp.txn.meta.CurrentSchema(),
                                sp.snapshotID)
                        if err != nil {
                                return err
@@ -615,9 +618,12 @@ func (sp *snapshotProducer) summary(props 
iceberg.Properties) (Summary, error) {
        ssc.setPartitionSummaryLimit(partitionSummaryLimit)
 
        currentSchema := sp.txn.meta.CurrentSchema()
-       partitionSpec := sp.txn.meta.CurrentSpec()
+       partitionSpec, err := sp.txn.meta.CurrentSpec()
+       if err != nil || partitionSpec == nil {
+               return Summary{}, fmt.Errorf("could not get current partition 
spec: %w", err)
+       }
        for _, df := range sp.addedFiles {
-               ssc.addFile(df, currentSchema, partitionSpec)
+               ssc.addFile(df, currentSchema, *partitionSpec)
        }
 
        if len(sp.deletedFiles) > 0 {
diff --git a/table/updates.go b/table/updates.go
index fae5a8d..af6f139 100644
--- a/table/updates.go
+++ b/table/updates.go
@@ -180,25 +180,20 @@ func (u *upgradeFormatVersionUpdate) Apply(builder 
*MetadataBuilder) error {
 
 type addSchemaUpdate struct {
        baseUpdate
-       Schema       *iceberg.Schema `json:"schema"`
-       LastColumnID int             `json:"last-column-id"`
-       initial      bool
+       Schema  *iceberg.Schema `json:"schema"`
+       initial bool
 }
 
-// NewAddSchemaUpdate creates a new update that adds the given schema and last 
column ID to
-// the table metadata. If the initial flag is set to true, the schema is 
considered the initial
-// schema of the table, and all previously added schemas in the metadata 
builder are removed.
-func NewAddSchemaUpdate(schema *iceberg.Schema, lastColumnID int, initial 
bool) *addSchemaUpdate {
+// NewAddSchemaUpdate creates a new update that adds the given schema and 
updates the lastColumnID based on the schema.
+func NewAddSchemaUpdate(schema *iceberg.Schema) *addSchemaUpdate {
        return &addSchemaUpdate{
-               baseUpdate:   baseUpdate{ActionName: UpdateAddSchema},
-               Schema:       schema,
-               LastColumnID: lastColumnID,
-               initial:      initial,
+               baseUpdate: baseUpdate{ActionName: UpdateAddSchema},
+               Schema:     schema,
        }
 }
 
 func (u *addSchemaUpdate) Apply(builder *MetadataBuilder) error {
-       _, err := builder.AddSchema(u.Schema, u.LastColumnID, u.initial)
+       _, err := builder.AddSchema(u.Schema)
 
        return err
 }
diff --git a/table/updates_test.go b/table/updates_test.go
index d236a01..0c5eeea 100644
--- a/table/updates_test.go
+++ b/table/updates_test.go
@@ -143,11 +143,9 @@ func TestUnmarshalUpdates(t *testing.T) {
   }
 ]`),
                        expected: Updates{
-                               NewAddSchemaUpdate(
-                                       iceberg.NewSchema(1,
-                                               iceberg.NestedField{ID: 1, 
Name: "foo", Type: iceberg.StringType{}, Required: true},
-                                       ), 1, false,
-                               ),
+                               NewAddSchemaUpdate(iceberg.NewSchema(1,
+                                       iceberg.NestedField{ID: 1, Name: "foo", 
Type: iceberg.StringType{}, Required: true},
+                               )),
                                NewAddPartitionSpecUpdate(
                                        &spec, false),
                                NewAddSortOrderUpdate(&sortOrder, false),
@@ -202,7 +200,6 @@ func TestUnmarshalUpdates(t *testing.T) {
                                                actualAddSchema := 
actual[idx].(*addSchemaUpdate)
                                                assert.True(t, 
expectedAddSchema.Schema.Equals(actualAddSchema.Schema))
                                                assert.Equal(t, 
actualAddSchema.initial, expectedAddSchema.initial)
-                                               assert.Equal(t, 
actualAddSchema.LastColumnID, expectedAddSchema.LastColumnID)
                                        case "add-partition-spec":
                                                expectedAddPartitionSpec := 
u.(*addPartitionSpecUpdate)
                                                actualAddPartitionSpec := 
actual[idx].(*addPartitionSpecUpdate)

Reply via email to