This is an automated email from the ASF dual-hosted git repository. zeroshade pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push: new 038457bd feat(metadata): PartitionSpec builder & validations (#558) 038457bd is described below commit 038457bd0a73aa5e8add5389777d776e6561675f Author: Tobias Pütz <tob...@minio.io> AuthorDate: Wed Sep 17 22:20:07 2025 +0200 feat(metadata): PartitionSpec builder & validations (#558) This PR: 1. adds a funct-opts builder for PartitionSpec 2. implements the remove PartitionSpec Update 3. ports test relating to PartitionSpecs from iceberg-rust 4. port missing validations from iceberg-rust to metadataBuilder It's mostly contained within partitions, there's a more fundamental change to json deserialization in there which initializes the deserialization target with `-1` in all int fields so that we are able to determine if we received / did not receive a certain field and are consequently able to reject requests where mandatory IDs are missing. There's currently still some duplication between the funct-opts constructor of `PartitionSpec` and `update_spec.go`. I'm still working on that one, I hope to arrive at something close to [java's apply() of BaseUpdatePartitionSpec](https://github.com/apache/iceberg/blob/8871bbcf4ffce83be7d1be8d75bf06e5ce7b36e4/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java#L304) --- catalog/catalog.go | 9 + catalog/rest/rest.go | 23 +- partitions.go | 226 ++++++++++++++--- table/arrow_utils_internal_test.go | 1 + table/internal/parquet_files_test.go | 1 + table/metadata.go | 276 ++++++++++++++++----- table/metadata_builder_internal_test.go | 165 +++++++++++- table/metadata_internal_test.go | 47 ++++ table/orphan_cleanup_integration_test.go | 21 +- ...leMetadataV1PartitionSpecsWithoutDefaultId.json | 58 +++++ .../TableMetadataV2MissingLastPartitionId.json | 73 ++++++ .../TableMetadataV2MissingPartitionSpecs.json | 67 +++++ table/transaction_test.go | 8 +- table/update_spec.go | 22 +- table/update_spec_test.go | 15 +- table/updates.go | 8 +- table/updates_test.go | 12 - 17 files changed, 885 insertions(+), 147 deletions(-) diff --git a/catalog/catalog.go b/catalog/catalog.go index 3fec5df9..fb87ead6 100644 --- a/catalog/catalog.go +++ b/catalog/catalog.go @@ -76,6 +76,15 @@ type CreateTableCfg struct { Properties iceberg.Properties } +func NewCreateTableCfg() CreateTableCfg { + return CreateTableCfg{ + Location: "", + PartitionSpec: nil, + SortOrder: table.UnsortedSortOrder, + Properties: nil, + } +} + // Catalog for iceberg table operations like create, drop, load, list and others. type Catalog interface { // CatalogType returns the type of the catalog. diff --git a/catalog/rest/rest.go b/catalog/rest/rest.go index 1c61a7dd..04983d23 100644 --- a/catalog/rest/rest.go +++ b/catalog/rest/rest.go @@ -745,32 +745,21 @@ func (r *Catalog) CreateTable(ctx context.Context, identifier table.Identifier, return nil, err } - var cfg catalog.CreateTableCfg + cfg := catalog.NewCreateTableCfg() for _, o := range opts { o(&cfg) } - freshSchema, err := iceberg.AssignFreshSchemaIDs(schema, nil) - if err != nil { - return nil, err - } - - freshPartitionSpec, err := iceberg.AssignFreshPartitionSpecIDs(cfg.PartitionSpec, schema, freshSchema) - if err != nil { - return nil, err - } - - freshSortOrder, err := table.AssignFreshSortOrderIDs(cfg.SortOrder, schema, freshSchema) - if err != nil { - return nil, err + if cfg.SortOrder.Fields == nil && cfg.SortOrder.OrderID == 0 { + cfg.SortOrder = table.UnsortedSortOrder } payload := createTableRequest{ Name: tbl, - Schema: freshSchema, + Schema: schema, Location: cfg.Location, - PartitionSpec: &freshPartitionSpec, - WriteOrder: &freshSortOrder, + PartitionSpec: cfg.PartitionSpec, + WriteOrder: &cfg.SortOrder, StageCreate: false, Props: cfg.Properties, } diff --git a/partitions.go b/partitions.go index 03cb3839..db66abd2 100644 --- a/partitions.go +++ b/partitions.go @@ -19,6 +19,7 @@ package iceberg import ( "encoding/json" + "errors" "fmt" "iter" "net/url" @@ -30,6 +31,7 @@ import ( const ( PartitionDataIDStart = 1000 InitialPartitionSpecID = 0 + unassignedFieldID = 0 ) // UnpartitionedSpec is the default unpartitioned spec which can @@ -85,10 +87,194 @@ type PartitionSpec struct { sourceIdToFields map[int][]PartitionField } +type PartitionOption func(*PartitionSpec) error + +// BindToSchema creates a new PartitionSpec by copying the fields from the +// existing spec verifying compatibility with the schema. +// +// If newSpecID is not nil, it will be used as the spec id for the new spec. +// Otherwise, the existing spec id will be used. +// If a field in the spec is incompatible with the schema, an error will be +// returned. +func (p *PartitionSpec) BindToSchema(schema *Schema, lastPartitionID *int, newSpecID *int, isUnbound bool) (PartitionSpec, error) { + opts := make([]PartitionOption, 0) + if newSpecID != nil { + opts = append(opts, WithSpecID(*newSpecID)) + } else { + opts = append(opts, WithSpecID(p.id)) + } + + for field := range p.Fields() { + opts = append(opts, AddPartitionFieldBySourceID(field.SourceID, field.Name, field.Transform, schema, &field.FieldID)) + } + + freshSpec, err := NewPartitionSpecOpts(opts...) + if err != nil { + return PartitionSpec{}, err + } + if err = freshSpec.assignPartitionFieldIds(lastPartitionID); err != nil { + return PartitionSpec{}, err + } + + return freshSpec, err +} + +func NewPartitionSpecOpts(opts ...PartitionOption) (PartitionSpec, error) { + spec := PartitionSpec{ + id: 0, + } + for _, opt := range opts { + if err := opt(&spec); err != nil { + return PartitionSpec{}, err + } + } + spec.initialize() + + return spec, nil +} + +func WithSpecID(id int) PartitionOption { + return func(p *PartitionSpec) error { + p.id = id + + return nil + } +} + +func AddPartitionFieldByName(sourceName string, targetName string, transform Transform, schema *Schema, fieldID *int) PartitionOption { + return func(p *PartitionSpec) error { + if schema == nil { + return errors.New("cannot add partition field with nil schema") + } + field, ok := schema.FindFieldByName(sourceName) + + if !ok { + return fmt.Errorf("cannot find source column with name: %s in schema", sourceName) + } + err := p.addSpecFieldInternal(targetName, field, transform, fieldID) + if err != nil { + return err + } + + return nil + } +} + +func AddPartitionFieldBySourceID(sourceID int, targetName string, transform Transform, schema *Schema, fieldID *int) PartitionOption { + return func(p *PartitionSpec) error { + if schema == nil { + return errors.New("cannot add partition field with nil schema") + } + field, ok := schema.FindFieldByID(sourceID) + if !ok { + return fmt.Errorf("cannot find source column with id: %d in schema", sourceID) + } + err := p.addSpecFieldInternal(targetName, field, transform, fieldID) + if err != nil { + return err + } + + return nil + } +} + +func (p *PartitionSpec) addSpecFieldInternal(targetName string, field NestedField, transform Transform, fieldID *int) error { + if targetName == "" { + return errors.New("cannot use empty partition name") + } + for _, existingField := range p.fields { + if existingField.Name == targetName { + return errors.New("duplicate partition name: " + targetName) + } + } + var fieldIDValue int + if fieldID == nil { + fieldIDValue = unassignedFieldID + } else { + fieldIDValue = *fieldID + } + if err := p.checkForRedundantPartitions(field.ID, transform); err != nil { + return err + } + unboundField := PartitionField{ + SourceID: field.ID, + FieldID: fieldIDValue, + Name: targetName, + Transform: transform, + } + p.fields = append(p.fields, unboundField) + + return nil +} + +func (p *PartitionSpec) checkForRedundantPartitions(sourceID int, transform Transform) error { + if fields, ok := p.sourceIdToFields[sourceID]; ok { + for _, f := range fields { + if f.Transform.Equals(transform) { + return fmt.Errorf("cannot add redundant partition with source id %d and transform %s. A partition with the same source id and transform already exists with name: %s", + sourceID, + transform, + f.Name) + } + } + } + + return nil +} + +func (p *PartitionSpec) Len() int { + return len(p.fields) +} + +func (ps *PartitionSpec) assignPartitionFieldIds(lastAssignedFieldIDPtr *int) error { + // This is set_field_ids from iceberg-rust + // Already assigned partition ids. If we see one of these during iteration, + // we skip it. + assignedIds := make(map[int]struct{}) + for _, field := range ps.fields { + if field.FieldID != unassignedFieldID { + if _, exists := assignedIds[field.FieldID]; exists { + return fmt.Errorf("duplicate field ID provided: %d", field.FieldID) + } + assignedIds[field.FieldID] = struct{}{} + } + } + + lastAssignedFieldID := ps.LastAssignedFieldID() + if lastAssignedFieldIDPtr != nil { + lastAssignedFieldID = *lastAssignedFieldIDPtr + } + for i := range ps.fields { + if ps.fields[i].FieldID == unassignedFieldID { + // Find the next available ID by incrementing from the last known good ID. + lastAssignedFieldID++ + for { + if _, exists := assignedIds[lastAssignedFieldID]; !exists { + break // Found an unused ID. + } + lastAssignedFieldID++ + } + + // Assign the new ID and immediately record it as used. + ps.fields[i].FieldID = lastAssignedFieldID + } else { + lastAssignedFieldID = max(lastAssignedFieldID, ps.fields[i].FieldID) + } + } + + return nil +} + +// NewPartitionSpec creates a new PartitionSpec with the given fields. +// +// The fields are not verified against a schema, use NewPartitionSpecOpts if you have to ensure compatibility. func NewPartitionSpec(fields ...PartitionField) PartitionSpec { return NewPartitionSpecID(InitialPartitionSpecID, fields...) } +// NewPartitionSpecID creates a new PartitionSpec with the given fields and id. +// +// The fields are not verified against a schema, use NewPartitionSpecOpts if you have to ensure compatibility. func NewPartitionSpecID(id int, fields ...PartitionField) PartitionSpec { ret := PartitionSpec{id: id, fields: fields} ret.initialize() @@ -122,6 +308,10 @@ func (ps PartitionSpec) Equals(other PartitionSpec) bool { // Fields returns a clone of the partition fields in this spec. func (ps *PartitionSpec) Fields() iter.Seq[PartitionField] { + if ps.fields == nil { + return slices.Values([]PartitionField{}) + } + return slices.Values(ps.fields) } @@ -209,6 +399,11 @@ func (ps *PartitionSpec) LastAssignedFieldID() int { } } + if id == unassignedFieldID { + // If no fields have been assigned an ID, return the default starting ID. + return PartitionDataIDStart - 1 + } + return id } @@ -263,37 +458,6 @@ func (ps *PartitionSpec) PartitionToPath(data structLike, sc *Schema) string { return path.Join(segments...) } -// AssignFreshPartitionSpecIDs creates a new PartitionSpec by reassigning the field IDs -// from the old schema to the corresponding fields in the fresh schema, while re-assigning -// the actual Spec IDs to 1000 + the position of the field in the partition spec. -func AssignFreshPartitionSpecIDs(spec *PartitionSpec, old, fresh *Schema) (PartitionSpec, error) { - if spec == nil { - return PartitionSpec{}, nil - } - - newFields := make([]PartitionField, 0, len(spec.fields)) - for pos, field := range spec.fields { - origCol, ok := old.FindColumnName(field.SourceID) - if !ok { - return PartitionSpec{}, fmt.Errorf("could not find field in old schema: %s", field.Name) - } - - freshField, ok := fresh.FindFieldByName(origCol) - if !ok { - return PartitionSpec{}, fmt.Errorf("could not find field in fresh schema: %s", field.Name) - } - - newFields = append(newFields, PartitionField{ - Name: field.Name, - SourceID: freshField.ID, - FieldID: PartitionDataIDStart + pos, - Transform: field.Transform, - }) - } - - return NewPartitionSpec(newFields...), nil -} - // GeneratePartitionFieldName returns default partition field name based on field transform type // // The default names are aligned with other client implementations diff --git a/table/arrow_utils_internal_test.go b/table/arrow_utils_internal_test.go index b39dfa28..7a19d627 100644 --- a/table/arrow_utils_internal_test.go +++ b/table/arrow_utils_internal_test.go @@ -44,6 +44,7 @@ func constructTestTable(t *testing.T, writeStats []string) (*metadata.FileMetaDa "last-column-id": 7, "current-schema-id": 0, "last-updated-ms": -1, + "last-partition-id": 0, "schemas": [ { "type": "struct", diff --git a/table/internal/parquet_files_test.go b/table/internal/parquet_files_test.go index cde72d8b..ab134c46 100644 --- a/table/internal/parquet_files_test.go +++ b/table/internal/parquet_files_test.go @@ -68,6 +68,7 @@ func constructTestTablePrimitiveTypes(t *testing.T) (*metadata.FileMetaData, tab ] } ], + "last-partition-id": 0, "last-updated-ms": -1, "default-spec-id": 0, "partition-specs": [{"spec-id": 0, "fields": []}], diff --git a/table/metadata.go b/table/metadata.go index ff3e9a0b..8da21a70 100644 --- a/table/metadata.go +++ b/table/metadata.go @@ -163,7 +163,8 @@ type MetadataBuilder struct { // >v1 specific lastSequenceNumber *int64 // update tracking - lastAddedSchemaID *int + lastAddedSchemaID *int + lastAddedPartitionID *int } func NewMetadataBuilder() (*MetadataBuilder, error) { @@ -284,15 +285,35 @@ func (b *MetadataBuilder) AddSchema(schema *iceberg.Schema) (*MetadataBuilder, e } func (b *MetadataBuilder) AddPartitionSpec(spec *iceberg.PartitionSpec, initial bool) (*MetadataBuilder, error) { - for _, s := range b.specs { - if s.ID() == spec.ID() && !initial { - return nil, fmt.Errorf("partition spec with id %d already exists", spec.ID()) + newSpecID := b.reuseOrCreateNewPartitionSpecID(*spec) + + freshSpec, err := spec.BindToSchema(b.CurrentSchema(), b.lastPartitionID, &newSpecID, false) + if err != nil { + return nil, err + } + + spec = nil + if _, err := b.GetSpecByID(newSpecID); err == nil { + if b.lastAddedPartitionID == nil || *b.lastAddedPartitionID != newSpecID { + b.updates = append(b.updates, NewAddPartitionSpecUpdate(&freshSpec, initial)) + b.lastAddedPartitionID = &newSpecID } + + return b, nil } maxFieldID := 0 - for f := range spec.Fields() { + fieldCount := 0 + for f := range freshSpec.Fields() { maxFieldID = max(maxFieldID, f.FieldID) + if b.formatVersion <= 1 { + expectedID := partitionFieldStartID + fieldCount + if f.FieldID != expectedID { + return nil, fmt.Errorf("v1 constraint: partition field IDs are not sequential: expected %d, got %d", expectedID, f.FieldID) + } + fieldCount++ + } + } prev := partitionFieldStartID - 1 @@ -303,14 +324,15 @@ func (b *MetadataBuilder) AddPartitionSpec(spec *iceberg.PartitionSpec, initial var specs []iceberg.PartitionSpec if initial { - specs = []iceberg.PartitionSpec{*spec} + specs = []iceberg.PartitionSpec{freshSpec} } else { - specs = append(b.specs, *spec) + specs = append(b.specs, freshSpec) } b.specs = specs b.lastPartitionID = &lastPartitionID - b.updates = append(b.updates, NewAddPartitionSpecUpdate(spec, initial)) + b.lastAddedPartitionID = &newSpecID + b.updates = append(b.updates, NewAddPartitionSpecUpdate(&freshSpec, initial)) return b, nil } @@ -439,13 +461,12 @@ func (b *MetadataBuilder) SetDefaultSortOrderID(defaultSortOrderID int) (*Metada } func (b *MetadataBuilder) SetDefaultSpecID(defaultSpecID int) (*MetadataBuilder, error) { + lastUsed := false if defaultSpecID == -1 { - defaultSpecID = maxBy(b.specs, func(s iceberg.PartitionSpec) int { - return s.ID() - }) - if !slices.ContainsFunc(b.updates, func(u Update) bool { - return u.Action() == UpdateAddSpec && u.(*addPartitionSpecUpdate).Spec.ID() == defaultSpecID - }) { + if b.lastAddedPartitionID != nil { + lastUsed = true + defaultSpecID = *b.lastAddedPartitionID + } else { return nil, errors.New("can't set default spec to last added with no added partition specs") } } @@ -458,7 +479,11 @@ func (b *MetadataBuilder) SetDefaultSpecID(defaultSpecID int) (*MetadataBuilder, return nil, fmt.Errorf("can't set default spec to spec with id %d: %w", defaultSpecID, err) } - b.updates = append(b.updates, NewSetDefaultSpecUpdate(defaultSpecID)) + if lastUsed { + b.updates = append(b.updates, NewSetDefaultSpecUpdate(-1)) + } else { + b.updates = append(b.updates, NewSetDefaultSpecUpdate(defaultSpecID)) + } b.defaultSpecID = defaultSpecID return b, nil @@ -793,6 +818,20 @@ func (b *MetadataBuilder) Build() (Metadata, error) { } } +func (b *MetadataBuilder) reuseOrCreateNewPartitionSpecID(newSpec iceberg.PartitionSpec) int { + newSpecID := 0 + for _, spec := range b.specs { + if spec.Equals(newSpec) { + return spec.ID() + } + if spec.ID() >= newSpecID { + newSpecID = spec.ID() + 1 + } + } + + return newSpecID +} + func (b *MetadataBuilder) reuseOrCreateNewSchemaID(newSchema *iceberg.Schema) int { newSchemaID := newSchema.ID for _, schema := range b.schemaList { @@ -807,6 +846,31 @@ func (b *MetadataBuilder) reuseOrCreateNewSchemaID(newSchema *iceberg.Schema) in return newSchemaID } +func (b *MetadataBuilder) RemovePartitionSpecs(ints []int) (*MetadataBuilder, error) { + if slices.Contains(ints, b.defaultSpecID) { + return nil, fmt.Errorf("can't remove default partition spec with id %d", b.defaultSpecID) + } + + newSpecs := make([]iceberg.PartitionSpec, 0, len(b.specs)-len(ints)) + removed := make([]int, len(ints)) + for _, spec := range b.specs { + if slices.Contains(ints, spec.ID()) { + removed = append(removed, spec.ID()) + + continue + } + newSpecs = append(newSpecs, spec) + } + + b.specs = newSpecs + + if len(removed) != 0 { + b.updates = append(b.updates, NewRemoveSpecUpdate(ints)) + } + + return b, nil +} + // maxBy returns the maximum value of extract(e) for all e in elems. // If elems is empty, returns 0. func maxBy[S ~[]E, E any](elems S, extract func(e E) int) int { @@ -852,9 +916,9 @@ func ParseMetadataBytes(b []byte) (Metadata, error) { var ret Metadata switch ver.FormatVersion { case 1: - ret = &metadataV1{} + ret = initMetadataV1Deser() case 2: - ret = &metadataV2{} + ret = initMetadataV2Deser() default: return nil, ErrInvalidMetadataFormatVersion } @@ -890,6 +954,15 @@ type commonMetadata struct { SnapshotRefs map[string]SnapshotRef `json:"refs,omitempty"` } +func initCommonMetadataForDeserialization() commonMetadata { + return commonMetadata{ + LastUpdatedMS: -1, + LastColumnId: -1, + CurrentSchemaID: -1, + DefaultSpecID: -1, + } +} + func (c *commonMetadata) Ref() SnapshotRef { return c.SnapshotRefs[MainBranch] } func (c *commonMetadata) Refs() iter.Seq2[string, SnapshotRef] { return maps.All(c.SnapshotRefs) } func (c *commonMetadata) SnapshotLogs() iter.Seq[SnapshotLogEntry] { @@ -1131,6 +1204,12 @@ func (c *commonMetadata) validate() error { case c.LastColumnId < 0: // last-column-id is required return fmt.Errorf("%w: missing last-column-id", ErrInvalidMetadata) + case c.CurrentSchemaID < 0: + return fmt.Errorf("%w: no valid schema configuration found in table metadata", ErrInvalidMetadata) + case c.LastPartitionID == nil: + if c.FormatVersion > 1 { + return fmt.Errorf("%w: last-partition-id must be set for FormatVersion > 1", ErrInvalidMetadata) + } } return nil @@ -1156,6 +1235,12 @@ type metadataV1 struct { commonMetadata } +func initMetadataV1Deser() *metadataV1 { + return &metadataV1{ + commonMetadata: initCommonMetadataForDeserialization(), + } +} + func (m *metadataV1) LastSequenceNumber() int64 { return 0 } func (m *metadataV1) Equals(other Metadata) bool { @@ -1184,6 +1269,10 @@ func (m *metadataV1) preValidate() { m.DefaultSpecID = m.Specs[0].ID() } + if m.DefaultSpecID == -1 && len(m.Specs) > 0 { + m.DefaultSpecID = maxBy(m.Specs, func(s iceberg.PartitionSpec) int { return s.ID() }) + } + if m.LastPartitionID == nil { id := m.Specs[0].LastAssignedFieldID() for _, spec := range m.Specs[1:] { @@ -1213,6 +1302,16 @@ func (m *metadataV1) UnmarshalJSON(b []byte) error { return err } + // CurrentSchemaID was optional in v1, it can also be expressed via Schema. + if aux.CurrentSchemaID == -1 && aux.Schema != nil { + aux.CurrentSchemaID = aux.Schema.ID + if !slices.ContainsFunc(aux.SchemaList, func(s *iceberg.Schema) bool { + return s.Equals(aux.Schema) && s.ID == aux.CurrentSchemaID + }) { + aux.SchemaList = append(aux.SchemaList, aux.Schema) + } + } + m.preValidate() return m.validate() @@ -1234,6 +1333,13 @@ type metadataV2 struct { commonMetadata } +func initMetadataV2Deser() *metadataV2 { + return &metadataV2{ + LastSeqNum: -1, + commonMetadata: initCommonMetadataForDeserialization(), + } +} + func (m *metadataV2) LastSequenceNumber() int64 { return m.LastSeqNum } func (m *metadataV2) Equals(other Metadata) bool { @@ -1278,63 +1384,117 @@ func NewMetadata(sc *iceberg.Schema, partitions *iceberg.PartitionSpec, sortOrde // NewMetadataWithUUID is like NewMetadata, but allows the caller to specify the UUID of the table rather than creating a new one. func NewMetadataWithUUID(sc *iceberg.Schema, partitions *iceberg.PartitionSpec, sortOrder SortOrder, location string, props iceberg.Properties, tableUuid uuid.UUID) (Metadata, error) { - freshSchema, err := iceberg.AssignFreshSchemaIDs(sc, nil) + if tableUuid == uuid.Nil { + tableUuid = uuid.New() + } + var err error + formatVersion := DefaultFormatVersion + if props != nil { + verStr, ok := props["format-version"] + if ok { + if formatVersion, err = strconv.Atoi(verStr); err != nil { + formatVersion = DefaultFormatVersion + } + delete(props, "format-version") + } + } + + reassignedIds, err := reassignIDs(sc, partitions, sortOrder) if err != nil { return nil, err } - freshPartitions, err := iceberg.AssignFreshPartitionSpecIDs(partitions, sc, freshSchema) + builder, err := NewMetadataBuilder() if err != nil { return nil, err } - freshSortOrder, err := AssignFreshSortOrderIDs(sortOrder, sc, freshSchema) + _, err = builder.SetFormatVersion(formatVersion) if err != nil { return nil, err } - if tableUuid == uuid.Nil { - tableUuid = uuid.New() + _, err = builder.SetUUID(tableUuid) + if err != nil { + return nil, err } - formatVersion := DefaultFormatVersion - if props != nil { - verStr, ok := props["format-version"] - if ok { - if formatVersion, err = strconv.Atoi(verStr); err != nil { - formatVersion = DefaultFormatVersion - } - delete(props, "format-version") + _, err = builder.AddSortOrder(&reassignedIds.sortOrder, true) + if err != nil { + return nil, err + } + + _, err = builder.SetDefaultSortOrderID(-1) + if err != nil { + return nil, err + } + + _, err = builder.AddSchema(reassignedIds.schema) + if err != nil { + return nil, err + } + + _, err = builder.SetCurrentSchemaID(-1) + if err != nil { + return nil, err + } + + _, err = builder.AddPartitionSpec(reassignedIds.partitionSpec, true) + if err != nil { + return nil, err + } + + _, err = builder.SetLoc(location) + if err != nil { + return nil, err + } + + _, err = builder.SetProperties(props) + if err != nil { + return nil, err + } + + return builder.Build() +} + +type ReassignedIds struct { + schema *iceberg.Schema + partitionSpec *iceberg.PartitionSpec + sortOrder SortOrder +} + +func reassignIDs(sc *iceberg.Schema, partitions *iceberg.PartitionSpec, sortOrder SortOrder) (*ReassignedIds, error) { + previousMapFn := sc.FindColumnName + freshSc, err := iceberg.AssignFreshSchemaIDs(sc, nil) + if err != nil { + return nil, err + } + + if partitions == nil { + partitions = iceberg.UnpartitionedSpec + } + opts := make([]iceberg.PartitionOption, 0) + for f := range partitions.Fields() { + var s string + var ok bool + if s, ok = previousMapFn(f.SourceID); !ok { + return nil, fmt.Errorf("field %d not found in schema", f.FieldID) } + opts = append(opts, iceberg.AddPartitionFieldByName(s, f.Name, f.Transform, freshSc, nil)) + } + freshPartitions, err := iceberg.NewPartitionSpecOpts(opts...) + if err != nil { + return nil, err } - lastPartitionID := freshPartitions.LastAssignedFieldID() - common := commonMetadata{ - LastUpdatedMS: time.Now().UnixMilli(), - LastColumnId: freshSchema.HighestFieldID(), - FormatVersion: formatVersion, - UUID: tableUuid, - Loc: location, - SchemaList: []*iceberg.Schema{freshSchema}, - CurrentSchemaID: freshSchema.ID, - Specs: []iceberg.PartitionSpec{freshPartitions}, - DefaultSpecID: freshPartitions.ID(), - LastPartitionID: &lastPartitionID, - Props: props, - SortOrderList: []SortOrder{freshSortOrder}, - DefaultSortOrderID: freshSortOrder.OrderID, - } - - switch formatVersion { - case 1: - return &metadataV1{ - commonMetadata: common, - Schema: freshSchema, - Partition: slices.Collect(freshPartitions.Fields()), - }, nil - case 2: - return &metadataV2{commonMetadata: common}, nil - default: - return nil, fmt.Errorf("invalid format version: %d", formatVersion) + freshSortOrder, err := AssignFreshSortOrderIDs(sortOrder, sc, freshSc) + if err != nil { + return nil, err } + + return &ReassignedIds{ + schema: freshSc, + partitionSpec: &freshPartitions, + sortOrder: freshSortOrder, + }, nil } diff --git a/table/metadata_builder_internal_test.go b/table/metadata_builder_internal_test.go index 229eab04..1fc900e4 100644 --- a/table/metadata_builder_internal_test.go +++ b/table/metadata_builder_internal_test.go @@ -18,9 +18,11 @@ package table import ( + "fmt" "testing" "github.com/apache/iceberg-go" + "github.com/davecgh/go-spew/spew" "github.com/stretchr/testify/require" ) @@ -74,14 +76,19 @@ func builderWithoutChanges(formatVersion int) MetadataBuilder { if err != nil { panic(err) } - _, err = builder.AddPartitionSpec(&partitionSpec, true) + _, err = builder.AddSchema(&tableSchema) if err != nil { panic(err) } - _, err = builder.AddSchema(&tableSchema) + _, err = builder.SetCurrentSchemaID(-1) if err != nil { panic(err) } + _, err = builder.AddPartitionSpec(&partitionSpec, true) + if err != nil { + panic(err) + } + meta, err := builder.Build() if err != nil { panic(err) @@ -94,6 +101,138 @@ func builderWithoutChanges(formatVersion int) MetadataBuilder { return *builder } +func TestAddRemovePartitionSpec(t *testing.T) { + builder := builderWithoutChanges(2) + builderRef := &builder + i := 1000 + addedSpec, err := iceberg.NewPartitionSpecOpts( + iceberg.WithSpecID(10), + iceberg.AddPartitionFieldBySourceID(2, "y", iceberg.IdentityTransform{}, builder.schemaList[0], &i), + iceberg.AddPartitionFieldBySourceID(3, "z", iceberg.IdentityTransform{}, builder.schemaList[0], nil)) + require.NoError(t, err) + + builderRef, err = builderRef.AddPartitionSpec(&addedSpec, false) + require.NoError(t, err) + metadata, err := builderRef.Build() + require.NoError(t, err) + require.NotNil(t, metadata) + + i2 := 1001 + expectedSpec, err := iceberg.NewPartitionSpecOpts( + iceberg.WithSpecID(1), + iceberg.AddPartitionFieldBySourceID(2, "y", iceberg.IdentityTransform{}, builder.schemaList[0], &i), + iceberg.AddPartitionFieldBySourceID(3, "z", iceberg.IdentityTransform{}, builder.schemaList[0], &i2)) + require.NoError(t, err) + require.Equal(t, metadata.DefaultPartitionSpec(), 0) + require.Equal(t, *metadata.LastPartitionSpecID(), i2) + found := false + for _, part := range metadata.PartitionSpecs() { + if part.ID() == 1 { + found = true + require.True(t, part.Equals(expectedSpec), "expected partition spec to match added spec") + } + } + require.True(t, found, "expected partition spec to be added") + + newBuilder, err := MetadataBuilderFromBase(metadata) + require.NoError(t, err) + // Remove the spec + newBuilderRef, err := newBuilder.RemovePartitionSpecs([]int{1}) + require.NoError(t, err) + newBuild, err := newBuilderRef.Build() + require.NoError(t, err) + require.NotNil(t, newBuild) + require.Len(t, newBuilder.updates, 1) + require.Len(t, newBuild.PartitionSpecs(), 1) + _, err = newBuilder.GetSpecByID(1) + require.ErrorContains(t, err, "partition spec with id 1 not found") +} + +func TestSetDefaultPartitionSpec(t *testing.T) { + builder := builderWithoutChanges(2) + curSchema, err := builder.GetSchemaByID(builder.currentSchemaID) + require.NoError(t, err) + // Add a partition spec + addedSpec, err := iceberg.NewPartitionSpecOpts( + iceberg.WithSpecID(10), + iceberg.AddPartitionFieldBySourceID(1, "y_bucket[2]", iceberg.BucketTransform{NumBuckets: 2}, curSchema, nil)) + require.NoError(t, err) + builderRef, err := builder.AddPartitionSpec(&addedSpec, false) + require.NoError(t, err) + // Set the default partition spec + builderRef, err = builderRef.SetDefaultSpecID(-1) + require.NoError(t, err) + + id := 1001 + expectedSpec, err := iceberg.NewPartitionSpecOpts( + iceberg.WithSpecID(1), + iceberg.AddPartitionFieldBySourceID(1, "y_bucket[2]", iceberg.BucketTransform{NumBuckets: 2}, curSchema, &id)) + require.NoError(t, err) + require.True(t, builderRef.HasChanges()) + require.Equal(t, len(builderRef.updates), 2) + require.True(t, builderRef.updates[0].(*addPartitionSpecUpdate).Spec.Equals(expectedSpec)) + require.Equal(t, -1, builderRef.updates[1].(*setDefaultSpecUpdate).SpecID) + metadata, err := builderRef.Build() + require.NoError(t, err) + require.NotNil(t, metadata) + + require.Equal(t, metadata.DefaultPartitionSpec(), 1) + require.True(t, expectedSpec.Equals(metadata.PartitionSpec()), fmt.Sprintf("expected partition spec to match added spec %s, %s", spew.Sdump(expectedSpec), spew.Sdump(metadata.PartitionSpec()))) + require.Equal(t, *metadata.LastPartitionSpecID(), 1001) +} + +func TestSetExistingDefaultPartitionSpec(t *testing.T) { + builder := builderWithoutChanges(2) + curSchema, err := builder.GetSchemaByID(builder.currentSchemaID) + require.NoError(t, err) + + addedSpec, err := iceberg.NewPartitionSpecOpts( + iceberg.WithSpecID(10), + iceberg.AddPartitionFieldBySourceID(1, "y_bucket[2]", iceberg.BucketTransform{NumBuckets: 2}, curSchema, nil)) + require.NoError(t, err) + + id := 1001 + expectedSpec, err := iceberg.NewPartitionSpecOpts(iceberg.WithSpecID(1), iceberg.AddPartitionFieldBySourceID(1, "y_bucket[2]", iceberg.BucketTransform{NumBuckets: 2}, curSchema, &id)) + require.NoError(t, err) + + builderRef, err := builder.AddPartitionSpec(&addedSpec, false) + require.NoError(t, err) + + builderRef, err = builderRef.SetDefaultSpecID(-1) + require.NoError(t, err) + + require.True(t, builderRef.HasChanges()) + require.Len(t, builderRef.updates, 2) + require.True(t, builderRef.updates[0].(*addPartitionSpecUpdate).Spec.Equals(expectedSpec)) + require.Equal(t, -1, builderRef.updates[1].(*setDefaultSpecUpdate).SpecID) + + metadata, err := builderRef.Build() + require.NoError(t, err) + require.NotNil(t, metadata) + require.Equal(t, 1, metadata.DefaultPartitionSpec()) + + require.True(t, expectedSpec.Equals(metadata.PartitionSpec()), "expected partition spec to match added spec") + + newBuilder, err := MetadataBuilderFromBase(metadata) + require.NoError(t, err) + require.NotNil(t, newBuilder) + + newBuilderRef, err := newBuilder.SetDefaultSpecID(0) + require.NoError(t, err) + + require.True(t, newBuilderRef.HasChanges(), "expected changes after setting default spec") + require.Len(t, newBuilderRef.updates, 1, "expected one update") + require.Equal(t, 0, newBuilderRef.updates[0].(*setDefaultSpecUpdate).SpecID, "expected default partition spec to be set to 0") + + newBuild, err := newBuilderRef.Build() + require.NoError(t, err) + require.NotNil(t, newBuild) + require.Equal(t, 0, newBuild.DefaultPartitionSpec(), "expected default partition spec to be set to 0") + + newWithoutChanges := builderWithoutChanges(2) + require.True(t, newWithoutChanges.specs[0].Equals(newBuild.PartitionSpec()), "expected partition spec to match added spec") +} + func TestSetRef(t *testing.T) { builder := builderWithoutChanges(2) schemaID := 0 @@ -129,6 +268,21 @@ func TestSetRef(t *testing.T) { require.Len(t, builder.snapshotLog, 1) } +func TestAddPartitionSpecForV1RequiresSequentialIDs(t *testing.T) { + builder := builderWithoutChanges(1) + + // Add a partition spec with non-sequential IDs + id := 1000 + id2 := 1002 + addedSpec, err := iceberg.NewPartitionSpecOpts(iceberg.WithSpecID(10), + iceberg.AddPartitionFieldBySourceID(2, "y", iceberg.IdentityTransform{}, builder.CurrentSchema(), &id), + iceberg.AddPartitionFieldBySourceID(3, "z", iceberg.IdentityTransform{}, builder.CurrentSchema(), &id2)) + require.NoError(t, err) + + _, err = builder.AddPartitionSpec(&addedSpec, false) + require.ErrorContains(t, err, "v1 constraint: partition field IDs are not sequential: expected 1001, got 1002") +} + func TestSetBranchSnapshotCreatesBranchIfNotExists(t *testing.T) { builder := builderWithoutChanges(2) schemaID := 0 @@ -213,3 +367,10 @@ func TestRemoveMainSnapshotRef(t *testing.T) { require.NoError(t, err) require.NotNil(t, meta) } + +func TestDefaultSpecCannotBeRemoved(t *testing.T) { + builder := builderWithoutChanges(2) + + _, err := builder.RemovePartitionSpecs([]int{0}) + require.ErrorContains(t, err, "can't remove default partition spec with id 0") +} diff --git a/table/metadata_internal_test.go b/table/metadata_internal_test.go index e183fcf3..ef1fc6ac 100644 --- a/table/metadata_internal_test.go +++ b/table/metadata_internal_test.go @@ -902,6 +902,7 @@ func TestMetadataV2Validation(t *testing.T) { "last-sequence-number": 34, "current-schema-id": 0, "last-updated-ms": 1602638573590, + "last-partition-id": 1000, "schemas": [{"type":"struct","schema-id":0,"fields":[]}], "default-spec-id": 0, "partition-specs": [{"spec-id": 0, "fields": []}], @@ -917,6 +918,7 @@ func TestMetadataV2Validation(t *testing.T) { "last-updated-ms": 1602638573874, "last-column-id": 5, "current-schema-id": 0, + "last-partition-id": 1000, "schemas": [{"type":"struct","schema-id":0,"fields":[]}], "partition-specs": [{"spec-id": 0, "fields": []}], "properties": {}, @@ -933,6 +935,7 @@ func TestMetadataV2Validation(t *testing.T) { "current-schema-id": 0, "last-updated-ms": 1602638573590, "last-column-id": 0, + "last-partition-id": 1000, "schemas": [{"type":"struct","schema-id":0,"fields":[]}], "partition-specs": [{"spec-id": 0, "fields": []}], "sort-orders": [], @@ -952,6 +955,50 @@ func TestMetadataV2Validation(t *testing.T) { require.NoError(t, meta3.UnmarshalJSON([]byte(zeroColumnID))) } +func TestTableMetadataV1PartitionSpecsWithoutDefaultId(t *testing.T) { + // Deserialize the JSON - this should succeed by inferring default_spec_id as the max spec ID + meta, err := getTestTableMetadata("TableMetadataV1PartitionSpecsWithoutDefaultId.json") + require.NoError(t, err) + require.Equal(t, meta.Version(), 1) + require.Equal(t, meta.TableUUID(), uuid.MustParse("d20125c8-7284-442c-9aea-15fee620737c")) + require.Equal(t, meta.DefaultPartitionSpec(), 2) + require.Equal(t, len(meta.PartitionSpecs()), 2) + spec := meta.PartitionSpec() + require.Equal(t, spec.ID(), 2) + require.Equal(t, spec.NumFields(), 1) + require.Equal(t, spec.Field(0).Name, "y") + require.Equal(t, spec.Field(0).Transform, iceberg.IdentityTransform{}) + require.Equal(t, spec.Field(0).SourceID, 2) +} + +func TestTableMetadataV2MissingPartitionSpecs(t *testing.T) { + meta, err := getTestTableMetadata("TableMetadataV2MissingPartitionSpecs.json") + require.Error(t, err) + require.Nil(t, meta) + // TODO: check for specific error +} + +func TestTableMetadataV2MissingLastPartitionId(t *testing.T) { + // Similarly to above, this should fail but isn't since Go's lack of an Option type means it will just put a 0 for + // the missing lastPartitionId. + meta, err := getTestTableMetadata("TableMetadataV2MissingLastPartitionId.json") + require.Error(t, err) + require.Nil(t, meta) + // TODO: check for specific error +} + +func TestDefaultPartitionSpec(t *testing.T) { + defaultSpecID := 1234 + meta, err := getTestTableMetadata("TableMetadataV2Valid.json") + require.NoError(t, err) + spec := iceberg.NewPartitionSpecID(1234) + + meta.(*metadataV2).DefaultSpecID = spec.ID() + meta.(*metadataV2).Specs = append(meta.(*metadataV2).Specs, spec) + partitionSpec := meta.PartitionSpec() + require.Equal(t, partitionSpec.ID(), defaultSpecID) +} + func getTestTableMetadata(fileName string) (Metadata, error) { fCont, err := os.ReadFile(path.Join("testdata", fileName)) if err != nil { diff --git a/table/orphan_cleanup_integration_test.go b/table/orphan_cleanup_integration_test.go index fef31c9d..bc4d3883 100644 --- a/table/orphan_cleanup_integration_test.go +++ b/table/orphan_cleanup_integration_test.go @@ -53,16 +53,17 @@ const ( OrphanFilePrefix = "orphan_" ) -var ( - tableSchemaSimple = iceberg.NewSchemaWithIdentifiers(1, - []int{1}, - iceberg.NestedField{ - ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int64, Required: true}, - iceberg.NestedField{ - ID: 2, Name: "name", Type: iceberg.PrimitiveTypes.String, Required: true}, - iceberg.NestedField{ - ID: 3, Name: "value", Type: iceberg.PrimitiveTypes.Float64, Required: false}, - ) +var tableSchemaSimple = iceberg.NewSchemaWithIdentifiers(1, + []int{1}, + iceberg.NestedField{ + ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int64, Required: true, + }, + iceberg.NestedField{ + ID: 2, Name: "name", Type: iceberg.PrimitiveTypes.String, Required: true, + }, + iceberg.NestedField{ + ID: 3, Name: "value", Type: iceberg.PrimitiveTypes.Float64, Required: false, + }, ) func (s *OrphanCleanupIntegrationSuite) loadCatalog() *rest.Catalog { diff --git a/table/testdata/TableMetadataV1PartitionSpecsWithoutDefaultId.json b/table/testdata/TableMetadataV1PartitionSpecsWithoutDefaultId.json new file mode 100644 index 00000000..6f4ed6a9 --- /dev/null +++ b/table/testdata/TableMetadataV1PartitionSpecsWithoutDefaultId.json @@ -0,0 +1,58 @@ +{ + "format-version": 1, + "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c", + "location": "s3://bucket/test/location", + "last-updated-ms": 1602638573874, + "last-column-id": 3, + "schema": { + "type": "struct", + "fields": [ + { + "id": 1, + "name": "x", + "required": true, + "type": "long" + }, + { + "id": 2, + "name": "y", + "required": true, + "type": "long", + "doc": "comment" + }, + { + "id": 3, + "name": "z", + "required": true, + "type": "long" + } + ] + }, + "partition-specs": [ + { + "spec-id": 1, + "fields": [ + { + "name": "x", + "transform": "identity", + "source-id": 1, + "field-id": 1000 + } + ] + }, + { + "spec-id": 2, + "fields": [ + { + "name": "y", + "transform": "identity", + "source-id": 2, + "field-id": 1001 + } + ] + } + ], + "properties": {}, + "current-snapshot-id": -1, + "snapshots": [] +} \ No newline at end of file diff --git a/table/testdata/TableMetadataV2MissingLastPartitionId.json b/table/testdata/TableMetadataV2MissingLastPartitionId.json new file mode 100644 index 00000000..31c2b4ca --- /dev/null +++ b/table/testdata/TableMetadataV2MissingLastPartitionId.json @@ -0,0 +1,73 @@ +{ + "format-version": 2, + "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1", + "location": "s3://bucket/test/location", + "last-sequence-number": 34, + "last-updated-ms": 1602638573590, + "last-column-id": 3, + "current-schema-id": 0, + "schemas": [{ + "type": "struct", + "schema-id": 0, + "fields": [ + { + "id": 1, + "name": "x", + "required": true, + "type": "long" + }, + { + "id": 2, + "name": "y", + "required": true, + "type": "long", + "doc": "comment" + }, + { + "id": 3, + "name": "z", + "required": true, + "type": "long" + } + ] + }], + "default-spec-id": 0, + "partition-specs": [ + { + "spec-id": 0, + "fields": [ + { + "name": "x", + "transform": "identity", + "source-id": 1, + "field-id": 1000 + } + ] + } + ], + "default-sort-order-id": 3, + "sort-orders": [ + { + "order-id": 3, + "fields": [ + { + "transform": "identity", + "source-id": 2, + "direction": "asc", + "null-order": "nulls-first" + }, + { + "transform": "bucket[4]", + "source-id": 3, + "direction": "desc", + "null-order": "nulls-last" + } + ] + } + ], + "properties": {}, + "current-snapshot-id": -1, + "snapshots": [], + "snapshot-log": [], + "metadata-log": [] +} \ No newline at end of file diff --git a/table/testdata/TableMetadataV2MissingPartitionSpecs.json b/table/testdata/TableMetadataV2MissingPartitionSpecs.json new file mode 100644 index 00000000..3ab0a7a1 --- /dev/null +++ b/table/testdata/TableMetadataV2MissingPartitionSpecs.json @@ -0,0 +1,67 @@ +{ + "format-version": 2, + "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1", + "location": "s3://bucket/test/location", + "last-sequence-number": 34, + "last-updated-ms": 1602638573590, + "last-column-id": 3, + "current-schema-id": 0, + "schemas": [{ + "type": "struct", + "schema-id": 0, + "fields": [ + { + "id": 1, + "name": "x", + "required": true, + "type": "long" + }, + { + "id": 2, + "name": "y", + "required": true, + "type": "long", + "doc": "comment" + }, + { + "id": 3, + "name": "z", + "required": true, + "type": "long" + } + ] + }], + "partition-spec": [ + { + "name": "x", + "transform": "identity", + "source-id": 1, + "field-id": 1000 + } + ], + "default-sort-order-id": 3, + "sort-orders": [ + { + "order-id": 3, + "fields": [ + { + "transform": "identity", + "source-id": 2, + "direction": "asc", + "null-order": "nulls-first" + }, + { + "transform": "bucket[4]", + "source-id": 3, + "direction": "desc", + "null-order": "nulls-last" + } + ] + } + ], + "properties": {}, + "current-snapshot-id": -1, + "snapshots": [], + "snapshot-log": [], + "metadata-log": [] +} \ No newline at end of file diff --git a/table/transaction_test.go b/table/transaction_test.go index db11af9d..a0269cb3 100644 --- a/table/transaction_test.go +++ b/table/transaction_test.go @@ -189,9 +189,11 @@ func (s *SparkIntegrationTestSuite) TestDifferentDataTypes() { iceberg.NestedField{ID: 14, Name: "small_dec", Type: iceberg.DecimalTypeOf(8, 2)}, iceberg.NestedField{ID: 15, Name: "med_dec", Type: iceberg.DecimalTypeOf(16, 2)}, iceberg.NestedField{ID: 16, Name: "large_dec", Type: iceberg.DecimalTypeOf(24, 2)}, - iceberg.NestedField{ID: 17, Name: "list", Type: &iceberg.ListType{ - ElementID: 18, - Element: iceberg.PrimitiveTypes.Int32}, + iceberg.NestedField{ + ID: 17, Name: "list", Type: &iceberg.ListType{ + ElementID: 18, + Element: iceberg.PrimitiveTypes.Int32, + }, }, ) diff --git a/table/update_spec.go b/table/update_spec.go index df777d63..9447761d 100644 --- a/table/update_spec.go +++ b/table/update_spec.go @@ -115,7 +115,10 @@ func (us *UpdateSpec) BuildUpdates() ([]Update, []Requirement, error) { } } - newSpec := us.Apply() + newSpec, err := us.Apply() + if err != nil { + return nil, nil, err + } updates := make([]Update, 0) requirements := make([]Requirement, 0) @@ -133,7 +136,7 @@ func (us *UpdateSpec) BuildUpdates() ([]Update, []Requirement, error) { return updates, requirements, nil } -func (us *UpdateSpec) Apply() iceberg.PartitionSpec { +func (us *UpdateSpec) Apply() (iceberg.PartitionSpec, error) { partitionFields := make([]iceberg.PartitionField, 0) partitionNames := make(map[string]bool) spec := us.txn.tbl.Metadata().PartitionSpec() @@ -147,7 +150,7 @@ func (us *UpdateSpec) Apply() iceberg.PartitionSpec { newField, err = us.addNewField(us.txn.tbl.Schema(), field.SourceID, field.FieldID, field.Name, field.Transform, partitionNames) } if err != nil { - return iceberg.PartitionSpec{} + return iceberg.PartitionSpec{}, err } partitionFields = append(partitionFields, newField) } else if us.txn.tbl.Metadata().Version() == 1 { @@ -157,15 +160,22 @@ func (us *UpdateSpec) Apply() iceberg.PartitionSpec { newField, err = us.addNewField(us.txn.tbl.Schema(), field.SourceID, field.FieldID, field.Name, iceberg.VoidTransform{}, partitionNames) } if err != nil { - return iceberg.PartitionSpec{} + return iceberg.PartitionSpec{}, err } partitionFields = append(partitionFields, newField) } } partitionFields = append(partitionFields, us.adds...) + opts := make([]iceberg.PartitionOption, len(partitionFields)) + for i, field := range partitionFields { + opts[i] = iceberg.AddPartitionFieldBySourceID(field.SourceID, field.Name, field.Transform, us.txn.tbl.Schema(), &field.FieldID) + } - newSpec := iceberg.NewPartitionSpec(partitionFields...) + newSpec, err := iceberg.NewPartitionSpecOpts(opts...) + if err != nil { + return iceberg.PartitionSpec{}, err + } newSpecId := iceberg.InitialPartitionSpecID for _, spec = range us.txn.tbl.Metadata().PartitionSpecs() { if newSpec.CompatibleWith(&spec) { @@ -177,7 +187,7 @@ func (us *UpdateSpec) Apply() iceberg.PartitionSpec { } } - return iceberg.NewPartitionSpecID(newSpecId, partitionFields...) + return iceberg.NewPartitionSpecID(newSpecId, partitionFields...), nil } func (us *UpdateSpec) Commit() error { diff --git a/table/update_spec_test.go b/table/update_spec_test.go index 0fc102aa..ea113111 100644 --- a/table/update_spec_test.go +++ b/table/update_spec_test.go @@ -75,7 +75,8 @@ func TestUpdateSpecAddField(t *testing.T) { assert.NotNil(t, updates) assert.NotNil(t, reqs) - newSpec := specUpdate.Apply() + newSpec, err := specUpdate.Apply() + assert.NoError(t, err) assert.NotNil(t, newSpec) assert.Equal(t, 1, newSpec.ID()) assert.Equal(t, 1003, newSpec.LastAssignedFieldID()) @@ -198,7 +199,8 @@ func TestUpdateSpecAddField(t *testing.T) { assert.NotNil(t, updates) assert.NotNil(t, reqs) - newSpec := specUpdate.Apply() + newSpec, err := specUpdate.Apply() + assert.NoError(t, err) assert.NotNil(t, newSpec) assert.Equal(t, "street_void_1001", newSpec.FieldsBySourceID(5)[0].Name) }) @@ -218,7 +220,8 @@ func TestUpdateSpecAddIdentityField(t *testing.T) { assert.NotNil(t, updates) assert.NotNil(t, reqs) - newSpec := specUpdate.Apply() + newSpec, err := specUpdate.Apply() + assert.NoError(t, err) assert.NotNil(t, newSpec) assert.Equal(t, 1, newSpec.ID()) assert.Equal(t, 1003, newSpec.LastAssignedFieldID()) @@ -254,7 +257,8 @@ func TestUpdateSpecRenameField(t *testing.T) { assert.NotNil(t, updates) assert.NotNil(t, reqs) - newSpec := updateSpec.Apply() + newSpec, err := updateSpec.Apply() + assert.NoError(t, err) assert.NotNil(t, newSpec) assert.Equal(t, 1, newSpec.ID()) assert.Equal(t, "new_id_identity", newSpec.FieldsBySourceID(1)[0].Name) @@ -318,7 +322,8 @@ func TestUpdateSpecRemoveField(t *testing.T) { assert.NotNil(t, updates) assert.NotNil(t, reqs) - newSpec := updateSpec.Apply() + newSpec, err := updateSpec.Apply() + assert.NoError(t, err) assert.NotNil(t, newSpec) assert.Equal(t, 1, newSpec.ID()) assert.Equal(t, 999, newSpec.LastAssignedFieldID()) diff --git a/table/updates.go b/table/updates.go index af6f139b..f91bd487 100644 --- a/table/updates.go +++ b/table/updates.go @@ -553,12 +553,12 @@ func (u *removeSnapshotRefUpdate) Apply(builder *MetadataBuilder) error { type removeSpecUpdate struct { baseUpdate - SpecIds []int64 `json:"spec-ids"` + SpecIds []int `json:"spec-ids"` } // NewRemoveSpecUpdate creates a new Update that removes a list of partition specs // from the table metadata. -func NewRemoveSpecUpdate(specIds []int64) *removeSpecUpdate { +func NewRemoveSpecUpdate(specIds []int) *removeSpecUpdate { return &removeSpecUpdate{ baseUpdate: baseUpdate{ActionName: UpdateRemoveSpec}, SpecIds: specIds, @@ -566,7 +566,9 @@ func NewRemoveSpecUpdate(specIds []int64) *removeSpecUpdate { } func (u *removeSpecUpdate) Apply(builder *MetadataBuilder) error { - return fmt.Errorf("%w: %s", iceberg.ErrNotImplemented, UpdateRemoveSpec) + _, err := builder.RemovePartitionSpecs(u.SpecIds) + + return err } type removeSchemasUpdate struct { diff --git a/table/updates_test.go b/table/updates_test.go index 0c5eeea1..c96a97c4 100644 --- a/table/updates_test.go +++ b/table/updates_test.go @@ -230,15 +230,3 @@ func TestRemoveSchemas(t *testing.T) { } }) } - -func TestRemovePartitionSpecs(t *testing.T) { - var builder *MetadataBuilder - removeSpecs := removeSpecUpdate{ - SpecIds: []int64{}, - } - t.Run("remove specs should fail", func(t *testing.T) { - if err := removeSpecs.Apply(builder); !errors.Is(err, iceberg.ErrNotImplemented) { - t.Fatalf("Expected unimplemented error, got %v", err) - } - }) -}