twuebi commented on code in PR #629: URL: https://github.com/apache/iceberg-go/pull/629#discussion_r2543694646
########## view/metadata_builder.go: ########## @@ -1,57 +1,468 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - package view import ( + "errors" + "fmt" + "maps" + "slices" + "strings" + "time" + "github.com/apache/iceberg-go" + + "github.com/google/uuid" ) -// MetadataBuilder is a struct used for building and updating Iceberg view metadata. -type MetadataBuilder struct{} +type MetadataBuilder struct { + base Metadata + updates Updates + + // common fields + formatVersion int + uuid uuid.UUID + loc string + schemaList []*iceberg.Schema + versionList []*Version + currentVersionID int64 + versionLog []VersionLogEntry + props iceberg.Properties + + // lookup maps + versionsById map[int64]*Version + schemasById map[int]*iceberg.Schema + + // update tracking + versionHistoryEntry *VersionLogEntry + lastAddedVersionID *int64 + lastAddedSchemaID *int + + // error tracking for build chaining + // if set, subsequent operations become a noop + err error +} + +func NewMetadataBuilder() (*MetadataBuilder, error) { + return &MetadataBuilder{ + formatVersion: DefaultViewFormatVersion, + updates: make([]Update, 0), + schemaList: make([]*iceberg.Schema, 0), + versionList: make([]*Version, 0), + versionLog: make([]VersionLogEntry, 0), + props: make(iceberg.Properties), + versionsById: make(map[int64]*Version), + schemasById: make(map[int]*iceberg.Schema), + }, nil +} + +func MetadataBuilderFromBase(metadata Metadata) (*MetadataBuilder, error) { + b := &MetadataBuilder{} + b.base = metadata + + // Copy fields + b.formatVersion = metadata.FormatVersion() + b.uuid = metadata.ViewUUID() + b.loc = metadata.Location() + b.schemaList = slices.Clone(metadata.Schemas()) + b.currentVersionID = metadata.CurrentVersionID() + b.versionList = cloneSlice(metadata.Versions()) + b.versionLog = slices.Clone(metadata.VersionLog()) + b.props = maps.Clone(metadata.Properties()) + + // Build lookup maps + b.versionsById = indexBy(b.versionList, func(vl *Version) int64 { return vl.VersionID }) + b.schemasById = indexBy(b.schemaList, func(sm *iceberg.Schema) int { return sm.ID }) + + return b, nil +} + +func (b *MetadataBuilder) HasChanges() bool { return len(b.updates) > 0 } -func (b *MetadataBuilder) AssignUUID(_ string) error { - return iceberg.ErrNotImplemented +func (b *MetadataBuilder) SetCurrentVersion(version *Version, schema *iceberg.Schema) *MetadataBuilder { + if b.err != nil { + return b + } + + newSchemaID, err := b.addSchema(schema) + if b.setErr(err) { + return b + } + + newVersion := version.Clone() + newVersion.SchemaID = newSchemaID + newVersionID, err := b.addVersion(newVersion) + if b.setErr(err) { + return b + } + + return b.SetCurrentVersionID(newVersionID) } -func (b *MetadataBuilder) UpgradeFormatVersion(_ int) error { - return iceberg.ErrNotImplemented +func (b *MetadataBuilder) AddVersion(newVersion *Version) *MetadataBuilder { + if b.err != nil { + return b + } + + _, err := b.addVersion(newVersion) + if b.setErr(err) { + return b + } + return b } -func (b *MetadataBuilder) AddSchema(_ *iceberg.Schema) error { - return iceberg.ErrNotImplemented +func (b *MetadataBuilder) addVersion(newVersion *Version) (int64, error) { + newVersionID := b.reuseOrCreateNewVersionID(newVersion) + version := newVersion.Clone() + if newVersionID != version.VersionID { + version.VersionID = newVersionID + } + + // Check if this version was added in an update already + if _, ok := b.versionsById[newVersionID]; ok { + for _, upd := range b.updates { + if vvu, ok := upd.(*addViewVersionUpdate); ok && vvu.Version.VersionID == newVersionID { + b.lastAddedVersionID = &newVersionID + return newVersionID, nil + } + } + } + + // If the SchemaID of the version is unset (nil), we attach the lastAddedSchemaID to it. + // If we have no lastAddedSchemaID, we fail + if version.SchemaID == LastAddedID { + if b.lastAddedSchemaID == nil { + return 0, errors.New("cannot set last added schema: no schema has been added") + } + version.SchemaID = *b.lastAddedSchemaID + } + + if _, ok := b.schemasById[version.SchemaID]; !ok { + return 0, fmt.Errorf("cannot add version with unknown schema: %d", version.SchemaID) + } + + if len(version.Representations) == 0 { + return 0, fmt.Errorf("cannot add version with no representations") + } + dialects := make(map[string]struct{}) + for _, repr := range version.Representations { + normalizedDialect := strings.ToLower(repr.Dialect) + if _, ok := dialects[normalizedDialect]; ok { + return 0, fmt.Errorf("invalid view version: cannot add multiple queries for dialect %s", normalizedDialect) + } + dialects[normalizedDialect] = struct{}{} + } + + b.versionList = append(b.versionList, version) + b.versionsById[version.VersionID] = version + + if b.lastAddedSchemaID != nil && version.SchemaID == *b.lastAddedSchemaID { + updateVersion := version.Clone() + updateVersion.SchemaID = LastAddedID + b.updates = append(b.updates, NewAddViewVersionUpdate(updateVersion)) + } else { + b.updates = append(b.updates, NewAddViewVersionUpdate(version)) + } + + b.lastAddedVersionID = &newVersionID + return newVersionID, nil } -func (b *MetadataBuilder) SetLocation(_ string) error { - return iceberg.ErrNotImplemented +func (b *MetadataBuilder) AddSchema(schema *iceberg.Schema) *MetadataBuilder { + if b.err != nil { + return b + } + + _, err := b.addSchema(schema) + if b.setErr(err) { + return b + } + return b } -func (b *MetadataBuilder) SetProperties(_ map[string]string) error { - return iceberg.ErrNotImplemented +func (b *MetadataBuilder) addSchema(schema *iceberg.Schema) (int, error) { + newSchemaID := b.reuseOrCreateNewSchemaID(schema) + + if _, ok := b.schemasById[newSchemaID]; ok { + return newSchemaID, nil + } + + newSchema := schema + // Build a fresh schema if we reset the ID + if schema.ID != newSchemaID { + newSchema = iceberg.NewSchemaWithIdentifiers(newSchemaID, schema.IdentifierFieldIDs, schema.Fields()...) + } + + b.schemaList = append(b.schemaList, newSchema) + b.schemasById[newSchema.ID] = newSchema + b.updates = append(b.updates, NewAddSchemaUpdate(newSchema)) + b.lastAddedSchemaID = &newSchemaID + + return newSchemaID, nil } -func (b *MetadataBuilder) RemoveProperties(_ []string) error { - return iceberg.ErrNotImplemented +func (b *MetadataBuilder) RemoveProperties(keys []string) *MetadataBuilder { + if b.err != nil { + return b + } + + if len(keys) == 0 { + return b + } + + b.updates = append(b.updates, NewRemovePropertiesUpdate(keys)) + for _, key := range keys { + delete(b.props, key) + } + + return b } -func (b *MetadataBuilder) AddVersion(_ *Version) error { - return iceberg.ErrNotImplemented +func (b *MetadataBuilder) SetCurrentVersionID(newVersionID int64) *MetadataBuilder { + if b.err != nil { + return b + } + + if newVersionID == LastAddedID { + if b.lastAddedVersionID == nil { + b.err = errors.New("cannot set current version to last added, no version has been added") + return b + } + newVersionID = *b.lastAddedVersionID + } + + if newVersionID == b.currentVersionID { + return b + } + + _, ok := b.versionsById[newVersionID] + if !ok { + b.err = fmt.Errorf("cannot set current version to unknown version with id %d", newVersionID) + return b + } + + if b.lastAddedVersionID != nil && *b.lastAddedVersionID == newVersionID { + b.updates = append(b.updates, NewSetCurrentVersionUpdate(LastAddedID)) + } else { + b.updates = append(b.updates, NewSetCurrentVersionUpdate(newVersionID)) + } + b.currentVersionID = newVersionID + + // Set the current history entry. + // If the version was added in the current set of changes, use its timestamp, + // otherwise use system time. + updateTimestampMS := time.Now().UnixMilli() + for _, update := range b.updates { + if v, ok := update.(*addViewVersionUpdate); ok && v.Version.VersionID == newVersionID { + updateTimestampMS = v.Version.TimestampMS + break + } + } + + b.versionHistoryEntry = &VersionLogEntry{ + VersionID: newVersionID, + TimestampMS: updateTimestampMS, + } + + return b } -func (b *MetadataBuilder) SetCurrentVersionID(_ int64) error { - return iceberg.ErrNotImplemented +func (b *MetadataBuilder) SetFormatVersion(formatVersion int) *MetadataBuilder { + if b.err != nil { + return b + } + + if formatVersion < b.formatVersion { + b.err = fmt.Errorf("downgrading format version from %d to %d is not allowed", + b.formatVersion, formatVersion) + return b + } + + if formatVersion > SupportedViewFormatVersion { + b.err = fmt.Errorf("unsupported format version %d", formatVersion) + return b + } + + if formatVersion == b.formatVersion { + return b + } + + b.updates = append(b.updates, NewUpgradeFormatVersionUpdate(formatVersion)) + b.formatVersion = formatVersion + + return b +} + +func (b *MetadataBuilder) SetLoc(loc string) *MetadataBuilder { + if b.err != nil { + return b + } + + if b.loc == loc { + return b + } + + b.updates = append(b.updates, NewSetLocationUpdate(loc)) + b.loc = loc + + return b +} + +func (b *MetadataBuilder) SetProperties(props iceberg.Properties) *MetadataBuilder { + if b.err != nil { + return b + } + + if len(props) == 0 { + return b + } + + b.updates = append(b.updates, NewSetPropertiesUpdate(props)) + if b.props == nil { + b.props = props + } else { + maps.Copy(b.props, props) + } + + return b +} + +func (b *MetadataBuilder) SetUUID(newUUID uuid.UUID) *MetadataBuilder { + if b.err != nil { + return b + } + + if newUUID == uuid.Nil { + b.err = errors.New("cannot set uuid to null") + return b + } + + // Noop - same UUID does not generate a change. + if b.uuid == newUUID { + return b + } + + if b.uuid != uuid.Nil { + b.err = errors.New("cannot reassign uuid") + return b + } + + b.updates = append(b.updates, NewAssignUUIDUpdate(newUUID)) + b.uuid = newUUID + + return b +} + +func (b *MetadataBuilder) Err() error { + return b.err +} + +func (b *MetadataBuilder) buildMetadata(withUpdates bool) (*metadata, error) { + uuid_ := b.uuid + if uuid_ == uuid.Nil { + uuid_ = uuid.New() + } + + md := &metadata{ + FormatVersionValue: b.formatVersion, + UUID: uuid_, + Loc: b.loc, + CurrentVersionIDValue: b.currentVersionID, + VersionList: b.versionList, + VersionLogList: b.versionLog, + SchemaList: b.schemaList, + Props: b.props, + } + if withUpdates { + md.updates = b.updates + } + md.init() + + return md, nil +} + +// build builds the view metadata updates from the builder +// if withUpdates is set to false, updates will not be included +// in the returned Metadata, thus its Updates() will return an empty slice +func (b *MetadataBuilder) build(withUpdates bool) (Metadata, error) { Review Comment: I'd keep it, they also apply to non-rest catalogs, there this may be the only place this is validated, I'd try to avoid reimplementing these checks in all catalogs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
