This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 85238d2  feat(catalog): Standardize Catalog create table function 
(#245)
85238d2 is described below

commit 85238d218b658e098f6b5a050930a6cb0bbef822
Author: Matt Topol <zotthewiz...@gmail.com>
AuthorDate: Wed Jan 15 01:45:24 2025 -0500

    feat(catalog): Standardize Catalog create table function (#245)
    
    * standardize CreateTable
    
    * update catalog impl
    
    * add test for table.NewMetadata and AssignFresh* functions
    
    * add docstrings for the new functions
    
    * use proper type for return of With helpers
    
    * fix lint, missing func
    
    * Update catalog/catalog.go
    
    Co-authored-by: Kevin Liu <kevinjq...@users.noreply.github.com>
    
    ---------
    
    Co-authored-by: Kevin Liu <kevinjq...@users.noreply.github.com>
---
 catalog/catalog.go              |  40 +++++++++-
 catalog/glue.go                 |   4 +
 catalog/rest.go                 |  66 +++++++----------
 partitions.go                   |  31 ++++++++
 schema.go                       | 160 ++++++++++++++++++++++++++++++++++++++++
 schema_test.go                  |  26 +++++++
 table/metadata.go               |  74 +++++++++++++++++++
 table/metadata_internal_test.go | 115 +++++++++++++++++++++++++++++
 table/sorting.go                |  35 +++++++++
 9 files changed, 511 insertions(+), 40 deletions(-)

diff --git a/catalog/catalog.go b/catalog/catalog.go
index 0bc8f49..8e01abc 100644
--- a/catalog/catalog.go
+++ b/catalog/catalog.go
@@ -24,6 +24,7 @@ import (
        "fmt"
        "maps"
        "net/url"
+       "strings"
 
        "github.com/apache/iceberg-go"
        "github.com/apache/iceberg-go/table"
@@ -156,6 +157,10 @@ type Catalog interface {
        // CatalogType returns the type of the catalog.
        CatalogType() CatalogType
 
+       // CreateTable creates a new iceberg table in the catalog using the 
provided identifier
+       // and schema. Options can be used to optionally provide location, 
partition spec, sort order,
+       // and custom properties.
+       CreateTable(ctx context.Context, identifier table.Identifier, schema 
*iceberg.Schema, opts ...createTableOpt) (*table.Table, error)
        // ListTables returns a list of table identifiers in the catalog, with 
the returned
        // identifiers containing the information required to load the table 
via that catalog.
        ListTables(ctx context.Context, namespace table.Identifier) 
([]table.Identifier, error)
@@ -217,7 +222,6 @@ func getUpdatedPropsAndUpdateSummary(currentProps 
iceberg.Properties, removals [
        if err := checkForOverlap(removals, updates); err != nil {
                return nil, PropertiesUpdateSummary{}, err
        }
-
        var (
                updatedProps = maps.Clone(currentProps)
                removed      = make([]string, 0, len(removals))
@@ -243,6 +247,38 @@ func getUpdatedPropsAndUpdateSummary(currentProps 
iceberg.Properties, removals [
                Updated: updated,
                Missing: iceberg.Difference(removals, removed),
        }
-
        return updatedProps, summary, nil
 }
+
+type createTableOpt func(*createTableCfg)
+
+type createTableCfg struct {
+       location      string
+       partitionSpec *iceberg.PartitionSpec
+       sortOrder     table.SortOrder
+       properties    iceberg.Properties
+}
+
+func WithLocation(location string) createTableOpt {
+       return func(cfg *createTableCfg) {
+               cfg.location = strings.TrimRight(location, "/")
+       }
+}
+
+func WithPartitionSpec(spec *iceberg.PartitionSpec) createTableOpt {
+       return func(cfg *createTableCfg) {
+               cfg.partitionSpec = spec
+       }
+}
+
+func WithSortOrder(order table.SortOrder) createTableOpt {
+       return func(cfg *createTableCfg) {
+               cfg.sortOrder = order
+       }
+}
+
+func WithProperties(props iceberg.Properties) createTableOpt {
+       return func(cfg *createTableCfg) {
+               cfg.properties = props
+       }
+}
diff --git a/catalog/glue.go b/catalog/glue.go
index 245116b..e628a4f 100644
--- a/catalog/glue.go
+++ b/catalog/glue.go
@@ -220,6 +220,10 @@ func (c *GlueCatalog) CatalogType() CatalogType {
        return Glue
 }
 
+func (c *GlueCatalog) CreateTable(ctx context.Context, identifier 
table.Identifier, schema *iceberg.Schema, opts ...createTableOpt) 
(*table.Table, error) {
+       panic("create table not implemented for Glue Catalog")
+}
+
 // DropTable deletes an Iceberg table from the Glue catalog.
 func (c *GlueCatalog) DropTable(ctx context.Context, identifier 
table.Identifier) error {
        database, tableName, err := identifierToGlueTable(identifier)
diff --git a/catalog/rest.go b/catalog/rest.go
index d068ae5..efb9217 100644
--- a/catalog/rest.go
+++ b/catalog/rest.go
@@ -134,38 +134,6 @@ func (t *loadTableResponse) UnmarshalJSON(b []byte) (err 
error) {
        return
 }
 
-type createTableOption func(*createTableRequest)
-
-func WithLocation(loc string) createTableOption {
-       return func(req *createTableRequest) {
-               req.Location = strings.TrimRight(loc, "/")
-       }
-}
-
-func WithPartitionSpec(spec *iceberg.PartitionSpec) createTableOption {
-       return func(req *createTableRequest) {
-               req.PartitionSpec = spec
-       }
-}
-
-func WithWriteOrder(order *table.SortOrder) createTableOption {
-       return func(req *createTableRequest) {
-               req.WriteOrder = order
-       }
-}
-
-func WithStageCreate() createTableOption {
-       return func(req *createTableRequest) {
-               req.StageCreate = true
-       }
-}
-
-func WithProperties(props iceberg.Properties) createTableOption {
-       return func(req *createTableRequest) {
-               req.Props = props
-       }
-}
-
 type createTableRequest struct {
        Name          string                 `json:"name"`
        Schema        *iceberg.Schema        `json:"schema"`
@@ -700,18 +668,40 @@ func splitIdentForPath(ident table.Identifier) (string, 
string, error) {
        return strings.Join(NamespaceFromIdent(ident), namespaceSeparator), 
TableNameFromIdent(ident), nil
 }
 
-func (r *RestCatalog) CreateTable(ctx context.Context, identifier 
table.Identifier, schema *iceberg.Schema, opts ...createTableOption) 
(*table.Table, error) {
+func (r *RestCatalog) CreateTable(ctx context.Context, identifier 
table.Identifier, schema *iceberg.Schema, opts ...createTableOpt) 
(*table.Table, error) {
        ns, tbl, err := splitIdentForPath(identifier)
        if err != nil {
                return nil, err
        }
 
-       payload := createTableRequest{
-               Name:   tbl,
-               Schema: schema,
-       }
+       var cfg createTableCfg
        for _, o := range opts {
-               o(&payload)
+               o(&cfg)
+       }
+
+       freshSchema, err := iceberg.AssignFreshSchemaIDs(schema, nil)
+       if err != nil {
+               return nil, err
+       }
+
+       freshPartitionSpec, err := 
iceberg.AssignFreshPartitionSpecIDs(cfg.partitionSpec, schema, freshSchema)
+       if err != nil {
+               return nil, err
+       }
+
+       freshSortOrder, err := table.AssignFreshSortOrderIDs(cfg.sortOrder, 
schema, freshSchema)
+       if err != nil {
+               return nil, err
+       }
+
+       payload := createTableRequest{
+               Name:          tbl,
+               Schema:        freshSchema,
+               Location:      cfg.location,
+               PartitionSpec: &freshPartitionSpec,
+               WriteOrder:    &freshSortOrder,
+               StageCreate:   false,
+               Props:         cfg.properties,
        }
 
        ret, err := doPost[createTableRequest, loadTableResponse](ctx, 
r.baseURI, []string{"namespaces", ns, "tables"}, payload,
diff --git a/partitions.go b/partitions.go
index 9416d70..ebf97ff 100644
--- a/partitions.go
+++ b/partitions.go
@@ -235,3 +235,34 @@ func (ps *PartitionSpec) PartitionType(schema *Schema) 
*StructType {
        }
        return &StructType{FieldList: nestedFields}
 }
+
+// AssignFreshPartitionSpecIDs creates a new PartitionSpec by reassigning the 
field IDs
+// from the old schema to the corresponding fields in the fresh schema, while 
re-assigning
+// the actual Spec IDs to 1000 + the position of the field in the partition 
spec.
+func AssignFreshPartitionSpecIDs(spec *PartitionSpec, old, fresh *Schema) 
(PartitionSpec, error) {
+       if spec == nil {
+               return PartitionSpec{}, nil
+       }
+
+       newFields := make([]PartitionField, 0, len(spec.fields))
+       for pos, field := range spec.fields {
+               origCol, ok := old.FindColumnName(field.SourceID)
+               if !ok {
+                       return PartitionSpec{}, fmt.Errorf("could not find 
field in old schema: %s", field.Name)
+               }
+
+               freshField, ok := fresh.FindFieldByName(origCol)
+               if !ok {
+                       return PartitionSpec{}, fmt.Errorf("could not find 
field in fresh schema: %s", field.Name)
+               }
+
+               newFields = append(newFields, PartitionField{
+                       Name:      field.Name,
+                       SourceID:  freshField.ID,
+                       FieldID:   partitionDataIDStart + pos,
+                       Transform: field.Transform,
+               })
+       }
+
+       return NewPartitionSpec(newFields...), nil
+}
diff --git a/schema.go b/schema.go
index 18014dc..04c29e7 100644
--- a/schema.go
+++ b/schema.go
@@ -588,6 +588,79 @@ func visitField[T any](f NestedField, visitor 
SchemaVisitor[T]) T {
        }
 }
 
+type PreOrderSchemaVisitor[T any] interface {
+       Schema(*Schema, func() T) T
+       Struct(StructType, []func() T) T
+       Field(NestedField, func() T) T
+       List(ListType, func() T) T
+       Map(MapType, func() T, func() T) T
+       Primitive(PrimitiveType) T
+}
+
+func PreOrderVisit[T any](sc *Schema, visitor PreOrderSchemaVisitor[T]) (res 
T, err error) {
+       if sc == nil {
+               err = fmt.Errorf("%w: cannot visit nil schema", 
ErrInvalidArgument)
+               return
+       }
+
+       defer func() {
+               if r := recover(); r != nil {
+                       switch e := r.(type) {
+                       case string:
+                               err = fmt.Errorf("error encountered during 
schema visitor: %s", e)
+                       case error:
+                               err = fmt.Errorf("error encountered during 
schema visitor: %w", e)
+                       }
+               }
+       }()
+
+       return visitor.Schema(sc, func() T {
+               return visitStructPreOrder(sc.AsStruct(), visitor)
+       }), nil
+}
+
+func visitStructPreOrder[T any](obj StructType, visitor 
PreOrderSchemaVisitor[T]) T {
+       results := make([]func() T, len(obj.FieldList))
+
+       for i, f := range obj.FieldList {
+               results[i] = func() T {
+                       return visitFieldPreOrder(f, visitor)
+               }
+       }
+
+       return visitor.Struct(obj, results)
+}
+
+func visitListPreOrder[T any](obj ListType, visitor PreOrderSchemaVisitor[T]) 
T {
+       return visitor.List(obj, func() T {
+               return visitFieldPreOrder(obj.ElementField(), visitor)
+       })
+}
+
+func visitMapPreOrder[T any](obj MapType, visitor PreOrderSchemaVisitor[T]) T {
+       return visitor.Map(obj, func() T {
+               return visitFieldPreOrder(obj.KeyField(), visitor)
+       }, func() T {
+               return visitFieldPreOrder(obj.ValueField(), visitor)
+       })
+}
+
+func visitFieldPreOrder[T any](f NestedField, visitor 
PreOrderSchemaVisitor[T]) T {
+       var fn func() T
+       switch typ := f.Type.(type) {
+       case *StructType:
+               fn = func() T { return visitStructPreOrder(*typ, visitor) }
+       case *ListType:
+               fn = func() T { return visitListPreOrder(*typ, visitor) }
+       case *MapType:
+               fn = func() T { return visitMapPreOrder(*typ, visitor) }
+       default:
+               fn = func() T { return visitor.Primitive(typ.(PrimitiveType)) }
+       }
+
+       return visitor.Field(f, fn)
+}
+
 // IndexByID performs a post-order traversal of the given schema and
 // returns a mapping from field ID to field.
 func IndexByID(schema *Schema) (map[int]NestedField, error) {
@@ -1069,6 +1142,93 @@ func buildAccessors(schema *Schema) (map[int]accessor, 
error) {
        return Visit(schema, buildPosAccessors{})
 }
 
+type setFreshIDs struct {
+       oldIdToNew map[int]int
+       nextIDFunc func() int
+}
+
+func (s *setFreshIDs) getAndInc(currentID int) int {
+       next := s.nextIDFunc()
+       s.oldIdToNew[currentID] = next
+       return next
+}
+
+func (s *setFreshIDs) Schema(_ *Schema, structResult func() Type) Type {
+       return structResult()
+}
+
+func (s *setFreshIDs) Struct(st StructType, fieldResults []func() Type) Type {
+       newFields := make([]NestedField, len(st.FieldList))
+       for idx, f := range st.FieldList {
+               newFields[idx] = NestedField{
+                       ID:       s.getAndInc(f.ID),
+                       Name:     f.Name,
+                       Type:     fieldResults[idx](),
+                       Doc:      f.Doc,
+                       Required: f.Required,
+               }
+       }
+       return &StructType{FieldList: newFields}
+}
+
+func (s *setFreshIDs) Field(_ NestedField, fieldResult func() Type) Type {
+       return fieldResult()
+}
+
+func (s *setFreshIDs) List(list ListType, elemResult func() Type) Type {
+       elemID := s.getAndInc(list.ElementID)
+       return &ListType{
+               ElementID:       elemID,
+               Element:         elemResult(),
+               ElementRequired: list.ElementRequired,
+       }
+}
+
+func (s *setFreshIDs) Map(mapType MapType, keyResult, valueResult func() Type) 
Type {
+       keyID := s.getAndInc(mapType.KeyID)
+       valueID := s.getAndInc(mapType.ValueID)
+       return &MapType{
+               KeyID:         keyID,
+               ValueID:       valueID,
+               KeyType:       keyResult(),
+               ValueType:     valueResult(),
+               ValueRequired: mapType.ValueRequired,
+       }
+}
+
+func (s *setFreshIDs) Primitive(p PrimitiveType) Type {
+       return p
+}
+
+// AssignFreshSchemaIDs creates a new schema with fresh field IDs for all of 
the
+// fields in it. The nextID function is used to iteratively generate the ids, 
if
+// it is nil then a simple incrementing counter is used starting at 1.
+func AssignFreshSchemaIDs(sc *Schema, nextID func() int) (*Schema, error) {
+       if nextID == nil {
+               var id int = 0
+               nextID = func() int {
+                       id++
+                       return id
+               }
+       }
+       visitor := &setFreshIDs{oldIdToNew: make(map[int]int), nextIDFunc: 
nextID}
+       outType, err := PreOrderVisit(sc, visitor)
+       if err != nil {
+               return nil, err
+       }
+
+       fields := outType.(*StructType).FieldList
+       var newIdentifierIDs []int
+       if len(sc.IdentifierFieldIDs) != 0 {
+               newIdentifierIDs = make([]int, len(sc.IdentifierFieldIDs))
+               for i, id := range sc.IdentifierFieldIDs {
+                       newIdentifierIDs[i] = visitor.oldIdToNew[id]
+               }
+       }
+
+       return NewSchemaWithIdentifiers(0, newIdentifierIDs, fields...), nil
+}
+
 type SchemaWithPartnerVisitor[T, P any] interface {
        Schema(sc *Schema, schemaPartner P, structResult T) T
        Struct(st StructType, structPartner P, fieldResults []T) T
diff --git a/schema_test.go b/schema_test.go
index 4e8e746..d080b9f 100644
--- a/schema_test.go
+++ b/schema_test.go
@@ -641,6 +641,32 @@ func TestPruneNilSchema(t *testing.T) {
        assert.ErrorIs(t, err, iceberg.ErrInvalidArgument)
 }
 
+func TestAssignFreshSchemaIDs(t *testing.T) {
+       startID := 100
+       sc, err := iceberg.AssignFreshSchemaIDs(tableSchemaNested, func() int {
+               startID++
+               return startID
+       })
+       require.NoError(t, err)
+       require.NotNil(t, sc)
+
+       startID = 100
+       var checkID func(iceberg.NestedField)
+       checkID = func(f iceberg.NestedField) {
+               startID++
+               assert.Equal(t, startID, f.ID)
+               if nested, ok := f.Type.(iceberg.NestedType); ok {
+                       for _, nf := range nested.Fields() {
+                               checkID(nf)
+                       }
+               }
+       }
+
+       for _, f := range sc.Fields() {
+               checkID(f)
+       }
+}
+
 func TestSchemaRoundTrip(t *testing.T) {
        data, err := json.Marshal(tableSchemaNested)
        require.NoError(t, err)
diff --git a/table/metadata.go b/table/metadata.go
index 73021c5..bd4896d 100644
--- a/table/metadata.go
+++ b/table/metadata.go
@@ -25,6 +25,7 @@ import (
        "iter"
        "maps"
        "slices"
+       "strconv"
        "time"
 
        "github.com/apache/iceberg-go"
@@ -1029,3 +1030,76 @@ func (m *metadataV2) UnmarshalJSON(b []byte) error {
        m.preValidate()
        return m.validate()
 }
+
+const DefaultFormatVersion = 2
+
+// NewMetadata creates a new table metadata object using the provided schema, 
information, generating a fresh UUID for
+// the new table metadata. By default, this will generate a V2 table metadata, 
but this can be modified
+// by adding a "format-version" property to the props map. An error will be 
returned if the "format-version"
+// property exists and is not a valid version number.
+func NewMetadata(sc *iceberg.Schema, partitions *iceberg.PartitionSpec, 
sortOrder SortOrder, location string, props iceberg.Properties) (Metadata, 
error) {
+       return NewMetadataWithUUID(sc, partitions, sortOrder, location, props, 
uuid.Nil)
+}
+
+// NewMetadataWithUUID is like NewMetadata, but allows the caller to specify 
the UUID of the table rather than creating a new one.
+func NewMetadataWithUUID(sc *iceberg.Schema, partitions 
*iceberg.PartitionSpec, sortOrder SortOrder, location string, props 
iceberg.Properties, tableUuid uuid.UUID) (Metadata, error) {
+       freshSchema, err := iceberg.AssignFreshSchemaIDs(sc, nil)
+       if err != nil {
+               return nil, err
+       }
+
+       freshPartitions, err := iceberg.AssignFreshPartitionSpecIDs(partitions, 
sc, freshSchema)
+       if err != nil {
+               return nil, err
+       }
+
+       freshSortOrder, err := AssignFreshSortOrderIDs(sortOrder, sc, 
freshSchema)
+       if err != nil {
+               return nil, err
+       }
+
+       if tableUuid == uuid.Nil {
+               tableUuid = uuid.New()
+       }
+
+       formatVersion := DefaultFormatVersion
+       if props != nil {
+               verStr, ok := props["format-version"]
+               if ok {
+                       if formatVersion, err = strconv.Atoi(verStr); err != 
nil {
+                               formatVersion = DefaultFormatVersion
+                       }
+                       delete(props, "format-version")
+               }
+       }
+
+       lastPartitionID := freshPartitions.LastAssignedFieldID()
+       common := commonMetadata{
+               LastUpdatedMS:      time.Now().UnixMilli(),
+               LastColumnId:       freshSchema.HighestFieldID(),
+               FormatVersion:      formatVersion,
+               UUID:               tableUuid,
+               Loc:                location,
+               SchemaList:         []*iceberg.Schema{freshSchema},
+               CurrentSchemaID:    freshSchema.ID,
+               Specs:              []iceberg.PartitionSpec{freshPartitions},
+               DefaultSpecID:      freshPartitions.ID(),
+               LastPartitionID:    &lastPartitionID,
+               Props:              props,
+               SortOrderList:      []SortOrder{freshSortOrder},
+               DefaultSortOrderID: freshSortOrder.OrderID,
+       }
+
+       switch formatVersion {
+       case 1:
+               return &metadataV1{
+                       commonMetadata: common,
+                       Schema:         freshSchema,
+                       Partition:      
slices.Collect(freshPartitions.Fields()),
+               }, nil
+       case 2:
+               return &metadataV2{commonMetadata: common}, nil
+       default:
+               return nil, fmt.Errorf("invalid format version: %d", 
formatVersion)
+       }
+}
diff --git a/table/metadata_internal_test.go b/table/metadata_internal_test.go
index a02ac7f..44aa037 100644
--- a/table/metadata_internal_test.go
+++ b/table/metadata_internal_test.go
@@ -491,3 +491,118 @@ func TestV1WriteMetadataToV2(t *testing.T) {
        assert.NotContains(t, rawData, "schema")
        assert.NotContains(t, rawData, "partition-spec")
 }
+
+func TestNewMetadataWithExplicitV1Format(t *testing.T) {
+       schema := iceberg.NewSchemaWithIdentifiers(10,
+               []int{22},
+               iceberg.NestedField{ID: 10, Name: "foo", Type: 
iceberg.PrimitiveTypes.String, Required: false},
+               iceberg.NestedField{ID: 22, Name: "bar", Type: 
iceberg.PrimitiveTypes.Int32, Required: true},
+               iceberg.NestedField{ID: 33, Name: "baz", Type: 
iceberg.PrimitiveTypes.Bool, Required: false},
+       )
+
+       partitionSpec := iceberg.NewPartitionSpecID(10,
+               iceberg.PartitionField{SourceID: 22, FieldID: 1022, Transform: 
iceberg.IdentityTransform{}, Name: "bar"})
+
+       sortOrder := SortOrder{
+               OrderID: 10,
+               Fields: []SortField{{
+                       SourceID:  10,
+                       Transform: iceberg.IdentityTransform{},
+                       Direction: SortASC, NullOrder: NullsLast}}}
+
+       actual, err := NewMetadata(schema, &partitionSpec, sortOrder, 
"s3://some_v1_location/", iceberg.Properties{"format-version": "1"})
+       require.NoError(t, err)
+
+       expectedSchema := iceberg.NewSchemaWithIdentifiers(0, []int{2},
+               iceberg.NestedField{ID: 1, Name: "foo", Type: 
iceberg.PrimitiveTypes.String},
+               iceberg.NestedField{ID: 2, Name: "bar", Type: 
iceberg.PrimitiveTypes.Int32, Required: true},
+               iceberg.NestedField{ID: 3, Name: "baz", Type: 
iceberg.PrimitiveTypes.Bool})
+
+       expectedSpec := iceberg.NewPartitionSpec(
+               iceberg.PartitionField{SourceID: 2, FieldID: 1000, Transform: 
iceberg.IdentityTransform{}, Name: "bar"})
+
+       expectedSortOrder := SortOrder{
+               OrderID: 1,
+               Fields: []SortField{{
+                       SourceID: 1, Transform: iceberg.IdentityTransform{},
+                       Direction: SortASC, NullOrder: NullsLast}}}
+
+       lastPartitionID := 1000
+       expected := &metadataV1{
+               commonMetadata: commonMetadata{
+                       Loc:                "s3://some_v1_location/",
+                       UUID:               actual.TableUUID(),
+                       LastUpdatedMS:      actual.LastUpdatedMillis(),
+                       LastColumnId:       3,
+                       SchemaList:         []*iceberg.Schema{expectedSchema},
+                       CurrentSchemaID:    0,
+                       Specs:              
[]iceberg.PartitionSpec{expectedSpec},
+                       DefaultSpecID:      0,
+                       LastPartitionID:    &lastPartitionID,
+                       SortOrderList:      []SortOrder{expectedSortOrder},
+                       DefaultSortOrderID: 1,
+                       FormatVersion:      1,
+               },
+               Schema:    expectedSchema,
+               Partition: slices.Collect(expectedSpec.Fields()),
+       }
+
+       assert.Truef(t, expected.Equals(actual), "expected: %s\ngot: %s", 
expected, actual)
+}
+
+func TestNewMetadataV2Format(t *testing.T) {
+       schema := iceberg.NewSchemaWithIdentifiers(10,
+               []int{22},
+               iceberg.NestedField{ID: 10, Name: "foo", Type: 
iceberg.PrimitiveTypes.String, Required: false},
+               iceberg.NestedField{ID: 22, Name: "bar", Type: 
iceberg.PrimitiveTypes.Int32, Required: true},
+               iceberg.NestedField{ID: 33, Name: "baz", Type: 
iceberg.PrimitiveTypes.Bool, Required: false},
+       )
+
+       partitionSpec := iceberg.NewPartitionSpecID(10,
+               iceberg.PartitionField{SourceID: 22, FieldID: 1022, Transform: 
iceberg.IdentityTransform{}, Name: "bar"})
+
+       sortOrder := SortOrder{
+               OrderID: 10,
+               Fields: []SortField{{
+                       SourceID:  10,
+                       Transform: iceberg.IdentityTransform{},
+                       Direction: SortASC, NullOrder: NullsLast}}}
+
+       tableUUID := uuid.New()
+
+       actual, err := NewMetadataWithUUID(schema, &partitionSpec, sortOrder, 
"s3://some_v1_location/", nil, tableUUID)
+       require.NoError(t, err)
+
+       expectedSchema := iceberg.NewSchemaWithIdentifiers(0, []int{2},
+               iceberg.NestedField{ID: 1, Name: "foo", Type: 
iceberg.PrimitiveTypes.String},
+               iceberg.NestedField{ID: 2, Name: "bar", Type: 
iceberg.PrimitiveTypes.Int32, Required: true},
+               iceberg.NestedField{ID: 3, Name: "baz", Type: 
iceberg.PrimitiveTypes.Bool})
+
+       expectedSpec := iceberg.NewPartitionSpec(
+               iceberg.PartitionField{SourceID: 2, FieldID: 1000, Transform: 
iceberg.IdentityTransform{}, Name: "bar"})
+
+       expectedSortOrder := SortOrder{
+               OrderID: 1,
+               Fields: []SortField{{
+                       SourceID: 1, Transform: iceberg.IdentityTransform{},
+                       Direction: SortASC, NullOrder: NullsLast}}}
+
+       lastPartitionID := 1000
+       expected := &metadataV2{
+               commonMetadata: commonMetadata{
+                       Loc:                "s3://some_v1_location/",
+                       UUID:               tableUUID,
+                       LastUpdatedMS:      actual.LastUpdatedMillis(),
+                       LastColumnId:       3,
+                       SchemaList:         []*iceberg.Schema{expectedSchema},
+                       CurrentSchemaID:    0,
+                       Specs:              
[]iceberg.PartitionSpec{expectedSpec},
+                       DefaultSpecID:      0,
+                       LastPartitionID:    &lastPartitionID,
+                       SortOrderList:      []SortOrder{expectedSortOrder},
+                       DefaultSortOrderID: 1,
+                       FormatVersion:      2,
+               }}
+
+       assert.Truef(t, expected.Equals(actual), "expected: %s\ngot: %s", 
expected, actual)
+}
diff --git a/table/sorting.go b/table/sorting.go
index 425a92e..e510439 100644
--- a/table/sorting.go
+++ b/table/sorting.go
@@ -175,3 +175,38 @@ func (s *SortOrder) UnmarshalJSON(b []byte) error {
 
        return nil
 }
+
+// AssignFreshSortOrderIDs updates and reassigns the field source IDs from the 
old schema
+// to the corresponding fields in the fresh schema, while also giving the Sort 
Order a fresh
+// ID of 0 (the initial Sort Order ID).
+func AssignFreshSortOrderIDs(sortOrder SortOrder, old, fresh *iceberg.Schema) 
(SortOrder, error) {
+       return AssignFreshSortOrderIDsWithID(sortOrder, old, fresh, 
InitialSortOrderID)
+}
+
+// AssignFreshSortOrderIDsWithID is like AssignFreshSortOrderIDs but allows 
specifying the id of the
+// returned SortOrder.
+func AssignFreshSortOrderIDsWithID(sortOrder SortOrder, old, fresh 
*iceberg.Schema, sortOrderID int) (SortOrder, error) {
+       if sortOrder.Equals(UnsortedSortOrder) {
+               return UnsortedSortOrder, nil
+       }
+
+       fields := make([]SortField, 0, len(sortOrder.Fields))
+       for _, field := range sortOrder.Fields {
+               originalField, ok := old.FindColumnName(field.SourceID)
+               if !ok {
+                       return SortOrder{}, fmt.Errorf("cannot find source 
column id %s in old schema", field.String())
+               }
+               freshField, ok := fresh.FindFieldByName(originalField)
+               if !ok {
+                       return SortOrder{}, fmt.Errorf("cannot find field %s in 
fresh schema", originalField)
+               }
+
+               fields = append(fields, SortField{
+                       SourceID:  freshField.ID,
+                       Transform: field.Transform,
+                       Direction: field.Direction,
+                       NullOrder: field.NullOrder,
+               })
+       }
+       return SortOrder{OrderID: sortOrderID, Fields: fields}, nil
+}

Reply via email to