This is an automated email from the ASF dual-hosted git repository. zeroshade pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push: new b511736 feat: reuse schemas & increment their IDS (#495) b511736 is described below commit b511736688dbc5e323754bc4462c7d992e01c71a Author: Tobias Pütz <tob...@minio.io> AuthorDate: Tue Aug 5 22:53:28 2025 +0200 feat: reuse schemas & increment their IDS (#495) Aligns us a bit more with the Java & Rust implementations where functionally equivalent schemas don't get added a second time. It also aligns renumbering of schemas to be equivalent to java/rust. Java source: https://github.com/apache/iceberg/blob/41c0b17a20c522e4df519bcc429f413e6a2855e5/core/src/main/java/org/apache/iceberg/TableMetadata.java#L117-L138 Rust source: https://github.com/apache/iceberg-rust/blob/c4c006939a3cb3bf684a0d10ba0e66cac484befe/crates/iceberg/src/spec/table_metadata_builder.rs#L1068 --- catalog/glue/glue_test.go | 6 +--- table/arrow_utils.go | 16 +++++++-- table/metadata.go | 78 ++++++++++++++++++++++++++++------------- table/metadata_internal_test.go | 71 +++++++++++++++++++++++++++++++++++++ table/snapshot_producers.go | 14 +++++--- table/updates.go | 19 ++++------ table/updates_test.go | 9 ++--- 7 files changed, 159 insertions(+), 54 deletions(-) diff --git a/catalog/glue/glue_test.go b/catalog/glue/glue_test.go index d56d4ef..33d0dda 100644 --- a/catalog/glue/glue_test.go +++ b/catalog/glue/glue_test.go @@ -1148,11 +1148,7 @@ func TestAlterTableIntegration(t *testing.T) { } newFields := append(currentSchema.Fields(), addField) // add column 'new_col' newFields = append(newFields[:1], newFields[2:]...) // drop column 'bar' - updateColumns := table.NewAddSchemaUpdate( - iceberg.NewSchemaWithIdentifiers(newSchemaId, currentSchema.IdentifierFieldIDs, newFields...), - addField.ID, - false, - ) + updateColumns := table.NewAddSchemaUpdate(iceberg.NewSchemaWithIdentifiers(newSchemaId, currentSchema.IdentifierFieldIDs, newFields...)) setSchema := table.NewSetCurrentSchemaUpdate(newSchemaId) _, _, err = ctlg.CommitTable( diff --git a/table/arrow_utils.go b/table/arrow_utils.go index 8085698..60a466a 100644 --- a/table/arrow_utils.go +++ b/table/arrow_utils.go @@ -1190,7 +1190,14 @@ func filesToDataFiles(ctx context.Context, fileIO iceio.IO, meta *MetadataBuilde } }() - currentSchema, currentSpec := meta.CurrentSchema(), meta.CurrentSpec() + partitionSpec, err := meta.CurrentSpec() + if err != nil || partitionSpec == nil { + yield(nil, fmt.Errorf("%w: cannot add files without a current spec", err)) + + return + } + + currentSchema, currentSpec := meta.CurrentSchema(), *partitionSpec for filePath := range paths { format := tblutils.FormatFromFileName(filePath) @@ -1287,9 +1294,12 @@ func recordsToDataFiles(ctx context.Context, rootLocation string, meta *Metadata if err != nil { panic(err) } - + currentSpec, err := meta.CurrentSpec() + if err != nil || currentSpec == nil { + panic(fmt.Errorf("%w: cannot write files without a current spec", err)) + } nextCount, stopCount := iter.Pull(args.counter) - if meta.CurrentSpec().IsUnpartitioned() { + if currentSpec.IsUnpartitioned() { tasks := func(yield func(WriteTask) bool) { defer stopCount() diff --git a/table/metadata.go b/table/metadata.go index 6744d5f..d48cb14 100644 --- a/table/metadata.go +++ b/table/metadata.go @@ -162,6 +162,8 @@ type MetadataBuilder struct { // >v1 specific lastSequenceNumber *int64 + // update tracking + lastAddedSchemaID *int } func NewMetadataBuilder() (*MetadataBuilder, error) { @@ -190,7 +192,8 @@ func MetadataBuilderFromBase(metadata Metadata) (*MetadataBuilder, error) { b.schemaList = slices.Clone(metadata.Schemas()) b.currentSchemaID = metadata.CurrentSchema().ID b.specs = slices.Clone(metadata.PartitionSpecs()) - b.defaultSpecID = metadata.DefaultPartitionSpec() + defaultSpecID := metadata.DefaultPartitionSpec() + b.defaultSpecID = defaultSpecID b.lastPartitionID = metadata.LastPartitionSpecID() b.props = maps.Clone(metadata.Properties()) b.snapshotList = slices.Clone(metadata.Snapshots()) @@ -214,8 +217,8 @@ func MetadataBuilderFromBase(metadata Metadata) (*MetadataBuilder, error) { func (b *MetadataBuilder) HasChanges() bool { return len(b.updates) > 0 } -func (b *MetadataBuilder) CurrentSpec() iceberg.PartitionSpec { - return b.specs[b.defaultSpecID] +func (b *MetadataBuilder) CurrentSpec() (*iceberg.PartitionSpec, error) { + return b.GetSpecByID(b.defaultSpecID) } func (b *MetadataBuilder) CurrentSchema() *iceberg.Schema { @@ -257,21 +260,30 @@ func (b *MetadataBuilder) currentSnapshot() *Snapshot { return s } -func (b *MetadataBuilder) AddSchema(schema *iceberg.Schema, newLastColumnID int, initial bool) (*MetadataBuilder, error) { +func (b *MetadataBuilder) AddSchema(schema *iceberg.Schema) (*MetadataBuilder, error) { + newLastColumnID := schema.HighestFieldID() if newLastColumnID < b.lastColumnId { return nil, fmt.Errorf("%w: newLastColumnID %d, must be >= %d", iceberg.ErrInvalidArgument, newLastColumnID, b.lastColumnId) } - var schemas []*iceberg.Schema - if initial { - schemas = []*iceberg.Schema{schema} - } else { - schemas = append(b.schemaList, schema) + newSchemaID := b.reuseOrCreateNewSchemaID(schema) + + if _, err := b.GetSchemaByID(newSchemaID); err == nil { + if b.lastAddedSchemaID == nil || *b.lastAddedSchemaID != newSchemaID { + b.updates = append(b.updates, NewAddSchemaUpdate(schema)) + b.lastAddedSchemaID = &newSchemaID + } + + return b, nil } - b.lastColumnId = newLastColumnID - b.schemaList = schemas - b.updates = append(b.updates, NewAddSchemaUpdate(schema, newLastColumnID, initial)) + b.lastColumnId = max(b.lastColumnId, schema.HighestFieldID()) + + schema.ID = newSchemaID + + b.schemaList = append(b.schemaList, schema) + b.updates = append(b.updates, NewAddSchemaUpdate(schema)) + b.lastAddedSchemaID = &newSchemaID return b, nil } @@ -384,14 +396,10 @@ func (b *MetadataBuilder) RemoveProperties(keys []string) (*MetadataBuilder, err func (b *MetadataBuilder) SetCurrentSchemaID(currentSchemaID int) (*MetadataBuilder, error) { if currentSchemaID == -1 { - currentSchemaID = maxBy(b.schemaList, func(s *iceberg.Schema) int { - return s.ID - }) - if !slices.ContainsFunc(b.updates, func(u Update) bool { - return u.Action() == UpdateAddSchema && u.(*addSchemaUpdate).Schema.ID == currentSchemaID - }) { + if b.lastAddedSchemaID == nil { return nil, errors.New("can't set current schema to last added schema, no schema has been added") } + currentSchemaID = *b.lastAddedSchemaID } if currentSchemaID == b.currentSchemaID { @@ -641,7 +649,12 @@ func (b *MetadataBuilder) SetLastUpdatedMS() *MetadataBuilder { return b } -func (b *MetadataBuilder) buildCommonMetadata() *commonMetadata { +func (b *MetadataBuilder) buildCommonMetadata() (*commonMetadata, error) { + if _, err := b.GetSpecByID(b.defaultSpecID); err != nil { + return nil, fmt.Errorf("defaultSpecID is invalid: %w", err) + } + defaultSpecID := b.defaultSpecID + if b.lastUpdatedMS == 0 { b.lastUpdatedMS = time.Now().UnixMilli() } @@ -655,7 +668,7 @@ func (b *MetadataBuilder) buildCommonMetadata() *commonMetadata { SchemaList: b.schemaList, CurrentSchemaID: b.currentSchemaID, Specs: b.specs, - DefaultSpecID: b.defaultSpecID, + DefaultSpecID: defaultSpecID, LastPartitionID: b.lastPartitionID, Props: b.props, SnapshotList: b.snapshotList, @@ -665,7 +678,7 @@ func (b *MetadataBuilder) buildCommonMetadata() *commonMetadata { SortOrderList: b.sortOrderList, DefaultSortOrderID: b.defaultSortOrderID, SnapshotRefs: b.refs, - } + }, nil } func (b *MetadataBuilder) GetSchemaByID(id int) (*iceberg.Schema, error) { @@ -736,7 +749,10 @@ func (b *MetadataBuilder) AppendMetadataLog(entry MetadataLogEntry) *MetadataBui } func (b *MetadataBuilder) Build() (Metadata, error) { - common := b.buildCommonMetadata() + common, err := b.buildCommonMetadata() + if err != nil { + return nil, err + } if err := common.validate(); err != nil { return nil, err } @@ -748,7 +764,7 @@ func (b *MetadataBuilder) Build() (Metadata, error) { return nil, fmt.Errorf("can't build metadata, missing schema for schema ID %d: %w", b.currentSchemaID, err) } - partition, err := b.GetSpecByID(b.defaultSpecID) + partition, err := b.GetSpecByID(common.DefaultSpecID) if err != nil { return nil, fmt.Errorf("can't build metadata, missing partition spec for spec ID %d: %w", b.defaultSpecID, err) } @@ -777,10 +793,24 @@ func (b *MetadataBuilder) Build() (Metadata, error) { }, nil default: - panic("unreachable: invalid format version") + return nil, fmt.Errorf("unknown format version %d", b.formatVersion) } } +func (b *MetadataBuilder) reuseOrCreateNewSchemaID(newSchema *iceberg.Schema) int { + newSchemaID := newSchema.ID + for _, schema := range b.schemaList { + if newSchema.Equals(schema) { + return schema.ID + } + if schema.ID >= newSchemaID { + newSchemaID = schema.ID + 1 + } + } + + return newSchemaID +} + // maxBy returns the maximum value of extract(e) for all e in elems. // If elems is empty, returns 0. func maxBy[S ~[]E, E any](elems S, extract func(e E) int) int { diff --git a/table/metadata_internal_test.go b/table/metadata_internal_test.go index 723a407..6ca47a7 100644 --- a/table/metadata_internal_test.go +++ b/table/metadata_internal_test.go @@ -764,6 +764,77 @@ func TestMetadataBuilderSetDefaultSpecIDLastPartition(t *testing.T) { assert.Equal(t, 0, builder.defaultSpecID) } +func TestMetadataBuilderSetLastAddedSchema(t *testing.T) { + builder, err := NewMetadataBuilder() + assert.NoError(t, err) + _, err = builder.SetFormatVersion(2) + assert.NoError(t, err) + schema := iceberg.NewSchema(1, + iceberg.NestedField{ID: 1, Name: "foo", Type: iceberg.StringType{}, Required: true}, + ) + _, err = builder.AddSchema(schema) + assert.NoError(t, err) + _, err = builder.SetCurrentSchemaID(-1) + assert.NoError(t, err) + + partitionSpec := iceberg.NewPartitionSpecID(0) + _, err = builder.AddPartitionSpec(&partitionSpec, false) + assert.NoError(t, err) + + _, err = builder.SetDefaultSpecID(-1) + assert.NoError(t, err) + + meta, err := builder.Build() + assert.NoError(t, err) + assert.Equal(t, schema.ID, meta.CurrentSchema().ID) + assert.True(t, schema.Equals(meta.CurrentSchema())) +} + +func TestMetadataBuilderSchemaIncreasingNumbering(t *testing.T) { + builder, err := NewMetadataBuilder() + assert.NoError(t, err) + _, err = builder.SetFormatVersion(2) + assert.NoError(t, err) + schema := iceberg.NewSchema(1, + iceberg.NestedField{ID: 1, Name: "foo", Type: iceberg.StringType{}, Required: true}, + ) + _, err = builder.AddSchema(schema) + assert.NoError(t, err) + + schema = iceberg.NewSchema(3, + iceberg.NestedField{ID: 3, Name: "foo", Type: iceberg.StringType{}, Required: true}, + ) + _, err = builder.AddSchema(schema) + assert.NoError(t, err) + + schema = iceberg.NewSchema(2, + iceberg.NestedField{ID: 4, Name: "foo", Type: iceberg.StringType{}, Required: true}, + ) + _, err = builder.AddSchema(schema) + assert.NoError(t, err) + + assert.Equal(t, 1, builder.schemaList[0].ID) + assert.Equal(t, 3, builder.schemaList[1].ID) + assert.Equal(t, 4, builder.schemaList[2].ID) +} + +func TestMetadataBuilderReuseSchema(t *testing.T) { + builder, err := NewMetadataBuilder() + assert.NoError(t, err) + schema := iceberg.NewSchema(1, + iceberg.NestedField{ID: 1, Name: "foo", Type: iceberg.StringType{}, Required: true}, + ) + _, err = builder.AddSchema(schema) + assert.NoError(t, err) + schema2 := iceberg.NewSchema(15, + iceberg.NestedField{ID: 1, Name: "foo", Type: iceberg.StringType{}, Required: true}, + ) + _, err = builder.AddSchema(schema2) + assert.NoError(t, err) + assert.Equal(t, len(builder.schemaList), 1) + assert.Equal(t, *builder.lastAddedSchemaID, 1) +} + func TestMetadataV1Validation(t *testing.T) { // Test case 1: JSON with no last-column-id field noColumnID := `{ diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go index 562ef68..58721ed 100644 --- a/table/snapshot_producers.go +++ b/table/snapshot_producers.go @@ -525,9 +525,12 @@ func (sp *snapshotProducer) manifests() ([]iceberg.ManifestFile, error) { defer out.Close() counter := &internal.CountingWriter{W: out} - + currentSpec, err := sp.txn.meta.CurrentSpec() + if err != nil || currentSpec == nil { + return fmt.Errorf("could not get current partition spec: %w", err) + } wr, err := iceberg.NewManifestWriter(sp.txn.meta.formatVersion, counter, - sp.txn.meta.CurrentSpec(), sp.txn.meta.CurrentSchema(), + *currentSpec, sp.txn.meta.CurrentSchema(), sp.snapshotID) if err != nil { return err @@ -615,9 +618,12 @@ func (sp *snapshotProducer) summary(props iceberg.Properties) (Summary, error) { ssc.setPartitionSummaryLimit(partitionSummaryLimit) currentSchema := sp.txn.meta.CurrentSchema() - partitionSpec := sp.txn.meta.CurrentSpec() + partitionSpec, err := sp.txn.meta.CurrentSpec() + if err != nil || partitionSpec == nil { + return Summary{}, fmt.Errorf("could not get current partition spec: %w", err) + } for _, df := range sp.addedFiles { - ssc.addFile(df, currentSchema, partitionSpec) + ssc.addFile(df, currentSchema, *partitionSpec) } if len(sp.deletedFiles) > 0 { diff --git a/table/updates.go b/table/updates.go index fae5a8d..af6f139 100644 --- a/table/updates.go +++ b/table/updates.go @@ -180,25 +180,20 @@ func (u *upgradeFormatVersionUpdate) Apply(builder *MetadataBuilder) error { type addSchemaUpdate struct { baseUpdate - Schema *iceberg.Schema `json:"schema"` - LastColumnID int `json:"last-column-id"` - initial bool + Schema *iceberg.Schema `json:"schema"` + initial bool } -// NewAddSchemaUpdate creates a new update that adds the given schema and last column ID to -// the table metadata. If the initial flag is set to true, the schema is considered the initial -// schema of the table, and all previously added schemas in the metadata builder are removed. -func NewAddSchemaUpdate(schema *iceberg.Schema, lastColumnID int, initial bool) *addSchemaUpdate { +// NewAddSchemaUpdate creates a new update that adds the given schema and updates the lastColumnID based on the schema. +func NewAddSchemaUpdate(schema *iceberg.Schema) *addSchemaUpdate { return &addSchemaUpdate{ - baseUpdate: baseUpdate{ActionName: UpdateAddSchema}, - Schema: schema, - LastColumnID: lastColumnID, - initial: initial, + baseUpdate: baseUpdate{ActionName: UpdateAddSchema}, + Schema: schema, } } func (u *addSchemaUpdate) Apply(builder *MetadataBuilder) error { - _, err := builder.AddSchema(u.Schema, u.LastColumnID, u.initial) + _, err := builder.AddSchema(u.Schema) return err } diff --git a/table/updates_test.go b/table/updates_test.go index d236a01..0c5eeea 100644 --- a/table/updates_test.go +++ b/table/updates_test.go @@ -143,11 +143,9 @@ func TestUnmarshalUpdates(t *testing.T) { } ]`), expected: Updates{ - NewAddSchemaUpdate( - iceberg.NewSchema(1, - iceberg.NestedField{ID: 1, Name: "foo", Type: iceberg.StringType{}, Required: true}, - ), 1, false, - ), + NewAddSchemaUpdate(iceberg.NewSchema(1, + iceberg.NestedField{ID: 1, Name: "foo", Type: iceberg.StringType{}, Required: true}, + )), NewAddPartitionSpecUpdate( &spec, false), NewAddSortOrderUpdate(&sortOrder, false), @@ -202,7 +200,6 @@ func TestUnmarshalUpdates(t *testing.T) { actualAddSchema := actual[idx].(*addSchemaUpdate) assert.True(t, expectedAddSchema.Schema.Equals(actualAddSchema.Schema)) assert.Equal(t, actualAddSchema.initial, expectedAddSchema.initial) - assert.Equal(t, actualAddSchema.LastColumnID, expectedAddSchema.LastColumnID) case "add-partition-spec": expectedAddPartitionSpec := u.(*addPartitionSpecUpdate) actualAddPartitionSpec := actual[idx].(*addPartitionSpecUpdate)