jnagel12 commented on code in PR #629:
URL: https://github.com/apache/iceberg-go/pull/629#discussion_r2542976387


##########
view/metadata.go:
##########
@@ -18,135 +18,523 @@
 package view
 
 import (
-       "context"
        "encoding/json"
        "errors"
        "fmt"
+       "io"
+       "maps"
+       "slices"
+       "strconv"
+       "strings"
+       "sync"
        "time"
 
        "github.com/apache/iceberg-go"
-       "github.com/apache/iceberg-go/internal"
-       "github.com/apache/iceberg-go/io"
+       iceinternal "github.com/apache/iceberg-go/internal"
+       "github.com/apache/iceberg-go/table"
+
        "github.com/google/uuid"
 )
 
-// LoadMetadata reads and parses a view metadata file from the specified 
location.
+var (
+       ErrInvalidViewMetadata              = errors.New("invalid view 
metadata")
+       ErrInvalidViewMetadataFormatVersion = errors.New("invalid or missing 
format-version in view metadata")
+)
+
+const (
+       // LastAddedID is used in place of ID fields (e.g. schema, version) to 
indicate that
+       // the last added instance of that type should be used.
+       LastAddedID                = -1
+       SupportedViewFormatVersion = 1
+       DefaultViewFormatVersion   = SupportedViewFormatVersion
+)
+
+const (
+       initialSchemaID = 0
+
+       viewEngineProperty = "engine-name"
+       defaultViewEngine  = "iceberg-go"
+)
+
+// Metadata for an iceberg view as specified in the Iceberg spec
+// https://iceberg.apache.org/view-spec/
+type Metadata interface {
+       // FormatVersion indicates the version of this metadata, 1 for V1
+       FormatVersion() int
+       // ViewUUID returns a UUID that identifies the view, generated when the
+       // view is created. Implementations must throw an exception if a view's
+       // UUID does not match the expected UUID after refreshing metadata.
+       ViewUUID() uuid.UUID
+       // Location is the table's base location. This is used by writers to 
determine
+       // where to store data files, manifest files, and table metadata files.
+       Location() string
+       // Schemas returns the list of view schemas
+       Schemas() []*iceberg.Schema
+       // CurrentVersionID returns the ID of the current version of the view 
(version-id)
+       CurrentVersionID() int64
+       // CurrentVersion returns the current version of the view
+       CurrentVersion() *Version
+       // CurrentSchemaID returns the ID of the current schema
+       CurrentSchemaID() int
+       // CurrentSchema returns the current schema of the view
+       CurrentSchema() *iceberg.Schema
+       // SchemasByID returns a map of schema IDs to schemas
+       SchemasByID() map[int]*iceberg.Schema
+       // Versions returns the list of view versions
+       Versions() []*Version
+       // VersionLog returns a list of version log entries
+       // with the timestamp and version-id for every change to 
current-version-id
+       VersionLog() []VersionLogEntry
+       // Properties is a string to string map of view properties.
+       Properties() iceberg.Properties
+
+       // Updates returns the list of metadata updates used to build this 
metadata
+       Updates() Updates
+
+       Equals(Metadata) bool
+}
+
+// VersionSummary is string to string map of summary metadata about a view's 
version
+type VersionSummary map[string]string
+
+// Representation is a struct containing information about a view's 
representation
+// https://iceberg.apache.org/view-spec/#sql-representation
+type Representation struct {
+       // Must be sql
+       Type string `json:"type"`
+       // A SQL SELECT statement
+       Sql string `json:"sql"`
+       // The dialect of the sql SELECT statement (e.g., "trino" or "spark")
+       Dialect string `json:"dialect"`
+}
+type Representations []Representation
+
+func NewRepresentation(sql string, dialect string) Representation {
+       return Representation{
+               Type:    "sql",
+               Sql:     sql,
+               Dialect: dialect,
+       }
+}
+
+type Version struct {
+       // ID for the version
+       VersionID int64 `json:"version-id"`
+       // ID of the schema for the view version
+       SchemaID int `json:"schema-id"`
+       // Timestamp when the version was created (ms from epoch)
+       TimestampMS int64 `json:"timestamp-ms"`
+       // A string to string map of summary metadata about the version
+       Summary VersionSummary `json:"summary"`
+       // A list of representations for the view definition
+       Representations []Representation `json:"representations"`
+       // The default view namespace (as a list of strings)
+       DefaultNamespace table.Identifier `json:"default-namespace"`
+       // An (optional) default catalog name for querying the view
+       DefaultCatalog string `json:"default-catalog,omitempty"`
+}
+
+type VersionOpt func(*Version)
+
+func WithVersionSummary(summary VersionSummary) VersionOpt {
+       return func(v *Version) {
+               if v.Summary == nil {
+                       v.Summary = summary
+               } else {
+                       maps.Copy(v.Summary, summary)
+               }
+       }
+}
+
+func WithDefaultViewCatalog(catalogName string) func(*Version) {
+       return func(version *Version) {
+               version.DefaultCatalog = catalogName
+       }
+}
+
+func WithTimestampMS(timestampMS int64) VersionOpt {
+       return func(version *Version) {
+               version.TimestampMS = timestampMS
+       }
+}
+
+// NewVersion creates a Version instance from the provided parameters.
+// If building updates using the MetadataBuilder, and one desires to use the 
last
+// added schema ID, one should use the LastAddedID constant as the provided 
schemaID
 //
-// Returns the loaded Metadata or an error if the file cannot be read or 
parsed.
-func LoadMetadata(ctx context.Context,
-       props iceberg.Properties,
-       metadataLocation string,
-       name string,
-       namespace string,
-) (_ Metadata, err error) {
-       fs, err := io.LoadFS(ctx, props, metadataLocation)
-       if err != nil {
-               return nil, fmt.Errorf("error loading view metadata: %w", err)
+// Note that NewVersion automatically seeds TimestampMS to 
time.Now().UnixMilli(),
+// and one should use the option WithTimestampMS to override this behavior.
+func NewVersion(id int64, schemaID int, representations []Representation, 
defaultNS table.Identifier, opts ...VersionOpt) (*Version, error) {
+       if id < 1 {
+               return nil, errors.New("id should be greater than 0")
        }
 
-       inputFile, err := fs.Open(metadataLocation)
-       if err != nil {
-               return nil, fmt.Errorf("%s.%s: error encountered loading view 
metadata: %w", namespace, name, err)
+       if len(representations) == 0 {
+               return nil, errors.New("invalid view version: must have at 
least one representation")
        }
-       defer internal.CheckedClose(inputFile, &err)
 
-       var m metadata
-       if err := json.NewDecoder(inputFile).Decode(&m); err != nil {
-               return nil, fmt.Errorf("error encountered decoding view 
metadata: %w", err)
+       version := &Version{
+               VersionID:        id,
+               SchemaID:         schemaID,
+               Representations:  representations,
+               Summary:          VersionSummary{viewEngineProperty: 
defaultViewEngine},
+               TimestampMS:      time.Now().UnixMilli(),
+               DefaultNamespace: defaultNS,
        }
 
-       return &m, nil
+       for _, opt := range opts {
+               opt(version)
+       }
+
+       return version, nil
 }
 
-// NewMetadata returns a view metadata for a given version.
-func NewMetadata(schema *iceberg.Schema,
-       version Version,
-       loc string,
-       props iceberg.Properties,
-) (Metadata, error) {
-       timestampMs := time.Now().UnixMilli()
+// NewVersionFromSQL creates a new Version with a single representation
+// using the provided SQL and dialect "default".
+func NewVersionFromSQL(id int64, schemaID int, sql string, defaultNS 
table.Identifier, opts ...VersionOpt) (*Version, error) {
+       return NewVersion(id, schemaID, Representations{NewRepresentation(sql, 
"default")}, defaultNS, opts...)
+}
 
-       if version.SchemaID != schema.ID {
-               return nil, fmt.Errorf("%w: version.SchemaID does not match 
schema.ID", iceberg.ErrInvalidArgument)
+// Equals checks whether the other Version would behave the same
+// while ignoring the view version id, and the creation timestamp
+func (v *Version) Equals(other *Version) bool {
+       return v.SchemaID == other.SchemaID &&
+               v.DefaultCatalog == other.DefaultCatalog &&
+               slices.Equal(v.DefaultNamespace, other.DefaultNamespace) &&
+               maps.Equal(v.Summary, other.Summary) &&
+               slices.Equal(v.Representations, other.Representations)
+}
+
+func (v *Version) Clone() *Version {
+       if v == nil {
+               return nil
        }
 
-       return &metadata{
-               UUID:             uuid.NewString(),
-               FmtVersion:       1,
-               Loc:              loc,
-               SchemaList:       []*iceberg.Schema{schema},
-               CurrentVersionId: version.VersionID,
-               VersionList:      []Version{version},
-               VersionLogList: []VersionLogEntry{
-                       {
-                               TimestampMs: timestampMs,
-                               VersionID:   version.VersionID,
-                       },
-               },
-               Props: props,
-       }, nil
+       cloned := *v
+       cloned.Summary = maps.Clone(v.Summary)
+       cloned.Representations = slices.Clone(v.Representations)
+       cloned.DefaultNamespace = slices.Clone(v.DefaultNamespace)
+       return &cloned
 }
 
-// CreateMetadata creates a new view metadata file and writes it to storage.
-//
-// Returns the full path to the created metadata file, or an error if creation 
fails.
-//
-// Note: This function only supports creating new views with format version 1.
-// It does not support updating existing view metadata.
-func CreateMetadata(
-       ctx context.Context,
-       catalogName string,
-       nsIdent []string,
-       schema *iceberg.Schema,
-       viewSQL string,
-       loc string,
-       props iceberg.Properties,
-) (metadataLocation string, err error) {
-       versionId := int64(1)
-       timestampMs := time.Now().UnixMilli()
-
-       viewVersion := Version{
-               VersionID:   versionId,
-               TimestampMs: timestampMs,
-               SchemaID:    schema.ID,
-               Summary:     map[string]string{"sql": viewSQL},
-               Representations: []SQLRepresentation{
-                       {Type: "sql", SQL: viewSQL, Dialect: "default"},
-               },
-               DefaultCatalog:   &catalogName,
-               DefaultNamespace: nsIdent,
-       }
-
-       metadataLocation = loc + "/metadata/view-" + uuid.New().String() + 
".metadata.json"
-       viewMetadata, err := NewMetadata(schema, viewVersion, loc, props)
+type VersionLogEntry struct {
+       // Timestamp when the view's current-version-id was updated (ms from 
epoch)
+       TimestampMS int64 `json:"timestamp-ms"`
+       // ID that current-version-id was set to
+       VersionID int64 `json:"version-id"`
+}
+
+// ParseMetadata parses json metadata provided by the passed in reader,
+// returning an error if one is encountered.
+func ParseMetadata(r io.Reader) (Metadata, error) {
+       data, err := io.ReadAll(r)
        if err != nil {
-               return "", fmt.Errorf("failed to create view metadata: %w", err)
+               return nil, err
        }
 
-       viewMetadataBytes, err := json.Marshal(viewMetadata)
-       if err != nil {
-               return "", fmt.Errorf("failed to marshal view metadata: %w", 
err)
+       return ParseMetadataBytes(data)
+}
+
+// ParseMetadataString is like [ParseMetadata], but for a string rather than
+// an io.Reader.
+func ParseMetadataString(s string) (Metadata, error) {
+       return ParseMetadataBytes([]byte(s))
+}
+
+// ParseMetadataBytes is like [ParseMetadataString] but for a byte slice.
+func ParseMetadataBytes(b []byte) (Metadata, error) {
+       ver := struct {
+               FormatVersion int `json:"format-version"`
+       }{}
+       if err := json.Unmarshal(b, &ver); err != nil {
+               return nil, err
        }
 
-       fs, err := io.LoadFS(ctx, props, metadataLocation)
-       if err != nil {
-               return "", fmt.Errorf("failed to load filesystem for view 
metadata: %w", err)
+       var ret Metadata
+       ret = &metadata{}
+       return ret, json.Unmarshal(b, ret)
+}
+
+// indexBy indexes a slice into a map, using a provided extractKey function
+// The extractKey function will be called on each item in the slice and assign 
the
+// item as a value for that key in the resultant map.
+func indexBy[T any, K comparable](s []T, extractKey func(T) K) map[K]T {
+       index := make(map[K]T)
+       for _, v := range s {
+               index[extractKey(v)] = v
+       }
+       return index
+}
+
+// Cloner is an interface which implements a Clone method for deep copying 
itself
+type cloner[T any] interface {
+       // Clone returns a deep copy of the underlying object
+       Clone() T
+}
+
+// cloneSlice returns a deep-clone of a Slice of elements implementing Cloner
+func cloneSlice[T cloner[T]](val []T) []T {
+       cloned := make([]T, len(val))
+       for i, elem := range val {
+               cloned[i] = elem.Clone()
+       }
+       return cloned
+}
+
+// https://iceberg.apache.org/view-spec/
+type metadata struct {
+       FormatVersionValue    int                `json:"format-version"`
+       UUID                  uuid.UUID          `json:"view-uuid"`
+       Loc                   string             `json:"location"`
+       CurrentVersionIDValue int64              `json:"current-version-id"`
+       VersionList           []*Version         `json:"versions"`
+       SchemaList            []*iceberg.Schema  `json:"schemas"`
+       VersionLogList        []VersionLogEntry  `json:"version-log"`
+       Props                 iceberg.Properties `json:"properties,omitempty"`
+
+       // updates from builder if applicable
+       updates Updates
+
+       // cached lookup helpers, must be initialized in init()
+       lazyVersionsByID func() map[int64]*Version
+       lazySchemasByID  func() map[int]*iceberg.Schema
+}
+
+func (m *metadata) Equals(other Metadata) bool {
+       if other == nil {
+               return false
+       }
+
+       if m == other {
+               return true
+       }
+
+       return m.UUID == other.ViewUUID() &&
+               m.Loc == other.Location() &&
+               m.FormatVersionValue == other.FormatVersion() &&
+               iceinternal.SliceEqualHelper(m.Schemas(), other.Schemas()) &&
+               iceinternal.SliceEqualHelper(m.Versions(), other.Versions()) &&
+               m.CurrentVersionIDValue == other.CurrentVersionID() &&
+               slices.Equal(m.VersionLogList, other.VersionLog())
+}
+
+func (m *metadata) Updates() Updates { return m.updates }
+
+func (m *metadata) FormatVersion() int         { return m.FormatVersionValue }
+func (m *metadata) ViewUUID() uuid.UUID        { return m.UUID }
+func (m *metadata) Location() string           { return m.Loc }
+func (m *metadata) Versions() []*Version       { return m.VersionList }
+func (m *metadata) Schemas() []*iceberg.Schema { return m.SchemaList }
+func (m *metadata) SchemasByID() map[int]*iceberg.Schema {
+       return maps.Clone(m.lazySchemasByID())
+}
+
+func (m *metadata) CurrentVersionID() int64 {
+       return m.CurrentVersionIDValue
+}
+
+func (m *metadata) CurrentVersion() *Version {
+       version, ok := m.lazyVersionsByID()[m.CurrentVersionIDValue]
+       if !ok {
+               panic("current version not found")
        }
+       return version
+}
+
+func (m *metadata) CurrentSchemaID() int {
+       return m.CurrentVersion().SchemaID
+}
 
-       wfs, ok := fs.(io.WriteFileIO)
+func (m *metadata) CurrentSchema() *iceberg.Schema {
+       schema, ok := m.lazySchemasByID()[m.CurrentSchemaID()]
        if !ok {
-               return "", errors.New("filesystem IO does not support writing")
+               panic("current schema not found")
+       }
+       return schema
+}
+
+func (m *metadata) VersionLog() []VersionLogEntry {
+       return m.VersionLogList
+}
+
+func (m *metadata) Properties() iceberg.Properties {
+       return m.Props
+}
+
+func (m *metadata) validate() error {
+       if m.Loc == "" {
+               return fmt.Errorf("%w: location is required", 
ErrInvalidViewMetadata)
+       }
+
+       if m.UUID == uuid.Nil {
+               return fmt.Errorf("%w: view-uuid is required", 
ErrInvalidViewMetadata)
+       }

Review Comment:
   Update - just turned it into a pointer so we can distinguish between unset 
vs "000..." on deserialization



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to