This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 85238d2 feat(catalog): Standardize Catalog create table function
(#245)
85238d2 is described below
commit 85238d218b658e098f6b5a050930a6cb0bbef822
Author: Matt Topol <[email protected]>
AuthorDate: Wed Jan 15 01:45:24 2025 -0500
feat(catalog): Standardize Catalog create table function (#245)
* standardize CreateTable
* update catalog impl
* add test for table.NewMetadata and AssignFresh* functions
* add docstrings for the new functions
* use proper type for return of With helpers
* fix lint, missing func
* Update catalog/catalog.go
Co-authored-by: Kevin Liu <[email protected]>
---------
Co-authored-by: Kevin Liu <[email protected]>
---
catalog/catalog.go | 40 +++++++++-
catalog/glue.go | 4 +
catalog/rest.go | 66 +++++++----------
partitions.go | 31 ++++++++
schema.go | 160 ++++++++++++++++++++++++++++++++++++++++
schema_test.go | 26 +++++++
table/metadata.go | 74 +++++++++++++++++++
table/metadata_internal_test.go | 115 +++++++++++++++++++++++++++++
table/sorting.go | 35 +++++++++
9 files changed, 511 insertions(+), 40 deletions(-)
diff --git a/catalog/catalog.go b/catalog/catalog.go
index 0bc8f49..8e01abc 100644
--- a/catalog/catalog.go
+++ b/catalog/catalog.go
@@ -24,6 +24,7 @@ import (
"fmt"
"maps"
"net/url"
+ "strings"
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/table"
@@ -156,6 +157,10 @@ type Catalog interface {
// CatalogType returns the type of the catalog.
CatalogType() CatalogType
+ // CreateTable creates a new iceberg table in the catalog using the
provided identifier
+ // and schema. Options can be used to optionally provide location,
partition spec, sort order,
+ // and custom properties.
+ CreateTable(ctx context.Context, identifier table.Identifier, schema
*iceberg.Schema, opts ...createTableOpt) (*table.Table, error)
// ListTables returns a list of table identifiers in the catalog, with
the returned
// identifiers containing the information required to load the table
via that catalog.
ListTables(ctx context.Context, namespace table.Identifier)
([]table.Identifier, error)
@@ -217,7 +222,6 @@ func getUpdatedPropsAndUpdateSummary(currentProps
iceberg.Properties, removals [
if err := checkForOverlap(removals, updates); err != nil {
return nil, PropertiesUpdateSummary{}, err
}
-
var (
updatedProps = maps.Clone(currentProps)
removed = make([]string, 0, len(removals))
@@ -243,6 +247,38 @@ func getUpdatedPropsAndUpdateSummary(currentProps
iceberg.Properties, removals [
Updated: updated,
Missing: iceberg.Difference(removals, removed),
}
-
return updatedProps, summary, nil
}
+
+type createTableOpt func(*createTableCfg)
+
+type createTableCfg struct {
+ location string
+ partitionSpec *iceberg.PartitionSpec
+ sortOrder table.SortOrder
+ properties iceberg.Properties
+}
+
+func WithLocation(location string) createTableOpt {
+ return func(cfg *createTableCfg) {
+ cfg.location = strings.TrimRight(location, "/")
+ }
+}
+
+func WithPartitionSpec(spec *iceberg.PartitionSpec) createTableOpt {
+ return func(cfg *createTableCfg) {
+ cfg.partitionSpec = spec
+ }
+}
+
+func WithSortOrder(order table.SortOrder) createTableOpt {
+ return func(cfg *createTableCfg) {
+ cfg.sortOrder = order
+ }
+}
+
+func WithProperties(props iceberg.Properties) createTableOpt {
+ return func(cfg *createTableCfg) {
+ cfg.properties = props
+ }
+}
diff --git a/catalog/glue.go b/catalog/glue.go
index 245116b..e628a4f 100644
--- a/catalog/glue.go
+++ b/catalog/glue.go
@@ -220,6 +220,10 @@ func (c *GlueCatalog) CatalogType() CatalogType {
return Glue
}
+func (c *GlueCatalog) CreateTable(ctx context.Context, identifier
table.Identifier, schema *iceberg.Schema, opts ...createTableOpt)
(*table.Table, error) {
+ panic("create table not implemented for Glue Catalog")
+}
+
// DropTable deletes an Iceberg table from the Glue catalog.
func (c *GlueCatalog) DropTable(ctx context.Context, identifier
table.Identifier) error {
database, tableName, err := identifierToGlueTable(identifier)
diff --git a/catalog/rest.go b/catalog/rest.go
index d068ae5..efb9217 100644
--- a/catalog/rest.go
+++ b/catalog/rest.go
@@ -134,38 +134,6 @@ func (t *loadTableResponse) UnmarshalJSON(b []byte) (err
error) {
return
}
-type createTableOption func(*createTableRequest)
-
-func WithLocation(loc string) createTableOption {
- return func(req *createTableRequest) {
- req.Location = strings.TrimRight(loc, "/")
- }
-}
-
-func WithPartitionSpec(spec *iceberg.PartitionSpec) createTableOption {
- return func(req *createTableRequest) {
- req.PartitionSpec = spec
- }
-}
-
-func WithWriteOrder(order *table.SortOrder) createTableOption {
- return func(req *createTableRequest) {
- req.WriteOrder = order
- }
-}
-
-func WithStageCreate() createTableOption {
- return func(req *createTableRequest) {
- req.StageCreate = true
- }
-}
-
-func WithProperties(props iceberg.Properties) createTableOption {
- return func(req *createTableRequest) {
- req.Props = props
- }
-}
-
type createTableRequest struct {
Name string `json:"name"`
Schema *iceberg.Schema `json:"schema"`
@@ -700,18 +668,40 @@ func splitIdentForPath(ident table.Identifier) (string,
string, error) {
return strings.Join(NamespaceFromIdent(ident), namespaceSeparator),
TableNameFromIdent(ident), nil
}
-func (r *RestCatalog) CreateTable(ctx context.Context, identifier
table.Identifier, schema *iceberg.Schema, opts ...createTableOption)
(*table.Table, error) {
+func (r *RestCatalog) CreateTable(ctx context.Context, identifier
table.Identifier, schema *iceberg.Schema, opts ...createTableOpt)
(*table.Table, error) {
ns, tbl, err := splitIdentForPath(identifier)
if err != nil {
return nil, err
}
- payload := createTableRequest{
- Name: tbl,
- Schema: schema,
- }
+ var cfg createTableCfg
for _, o := range opts {
- o(&payload)
+ o(&cfg)
+ }
+
+ freshSchema, err := iceberg.AssignFreshSchemaIDs(schema, nil)
+ if err != nil {
+ return nil, err
+ }
+
+ freshPartitionSpec, err :=
iceberg.AssignFreshPartitionSpecIDs(cfg.partitionSpec, schema, freshSchema)
+ if err != nil {
+ return nil, err
+ }
+
+ freshSortOrder, err := table.AssignFreshSortOrderIDs(cfg.sortOrder,
schema, freshSchema)
+ if err != nil {
+ return nil, err
+ }
+
+ payload := createTableRequest{
+ Name: tbl,
+ Schema: freshSchema,
+ Location: cfg.location,
+ PartitionSpec: &freshPartitionSpec,
+ WriteOrder: &freshSortOrder,
+ StageCreate: false,
+ Props: cfg.properties,
}
ret, err := doPost[createTableRequest, loadTableResponse](ctx,
r.baseURI, []string{"namespaces", ns, "tables"}, payload,
diff --git a/partitions.go b/partitions.go
index 9416d70..ebf97ff 100644
--- a/partitions.go
+++ b/partitions.go
@@ -235,3 +235,34 @@ func (ps *PartitionSpec) PartitionType(schema *Schema)
*StructType {
}
return &StructType{FieldList: nestedFields}
}
+
+// AssignFreshPartitionSpecIDs creates a new PartitionSpec by reassigning the
field IDs
+// from the old schema to the corresponding fields in the fresh schema, while
re-assigning
+// the actual Spec IDs to 1000 + the position of the field in the partition
spec.
+func AssignFreshPartitionSpecIDs(spec *PartitionSpec, old, fresh *Schema)
(PartitionSpec, error) {
+ if spec == nil {
+ return PartitionSpec{}, nil
+ }
+
+ newFields := make([]PartitionField, 0, len(spec.fields))
+ for pos, field := range spec.fields {
+ origCol, ok := old.FindColumnName(field.SourceID)
+ if !ok {
+ return PartitionSpec{}, fmt.Errorf("could not find
field in old schema: %s", field.Name)
+ }
+
+ freshField, ok := fresh.FindFieldByName(origCol)
+ if !ok {
+ return PartitionSpec{}, fmt.Errorf("could not find
field in fresh schema: %s", field.Name)
+ }
+
+ newFields = append(newFields, PartitionField{
+ Name: field.Name,
+ SourceID: freshField.ID,
+ FieldID: partitionDataIDStart + pos,
+ Transform: field.Transform,
+ })
+ }
+
+ return NewPartitionSpec(newFields...), nil
+}
diff --git a/schema.go b/schema.go
index 18014dc..04c29e7 100644
--- a/schema.go
+++ b/schema.go
@@ -588,6 +588,79 @@ func visitField[T any](f NestedField, visitor
SchemaVisitor[T]) T {
}
}
+type PreOrderSchemaVisitor[T any] interface {
+ Schema(*Schema, func() T) T
+ Struct(StructType, []func() T) T
+ Field(NestedField, func() T) T
+ List(ListType, func() T) T
+ Map(MapType, func() T, func() T) T
+ Primitive(PrimitiveType) T
+}
+
+func PreOrderVisit[T any](sc *Schema, visitor PreOrderSchemaVisitor[T]) (res
T, err error) {
+ if sc == nil {
+ err = fmt.Errorf("%w: cannot visit nil schema",
ErrInvalidArgument)
+ return
+ }
+
+ defer func() {
+ if r := recover(); r != nil {
+ switch e := r.(type) {
+ case string:
+ err = fmt.Errorf("error encountered during
schema visitor: %s", e)
+ case error:
+ err = fmt.Errorf("error encountered during
schema visitor: %w", e)
+ }
+ }
+ }()
+
+ return visitor.Schema(sc, func() T {
+ return visitStructPreOrder(sc.AsStruct(), visitor)
+ }), nil
+}
+
+func visitStructPreOrder[T any](obj StructType, visitor
PreOrderSchemaVisitor[T]) T {
+ results := make([]func() T, len(obj.FieldList))
+
+ for i, f := range obj.FieldList {
+ results[i] = func() T {
+ return visitFieldPreOrder(f, visitor)
+ }
+ }
+
+ return visitor.Struct(obj, results)
+}
+
+func visitListPreOrder[T any](obj ListType, visitor PreOrderSchemaVisitor[T])
T {
+ return visitor.List(obj, func() T {
+ return visitFieldPreOrder(obj.ElementField(), visitor)
+ })
+}
+
+func visitMapPreOrder[T any](obj MapType, visitor PreOrderSchemaVisitor[T]) T {
+ return visitor.Map(obj, func() T {
+ return visitFieldPreOrder(obj.KeyField(), visitor)
+ }, func() T {
+ return visitFieldPreOrder(obj.ValueField(), visitor)
+ })
+}
+
+func visitFieldPreOrder[T any](f NestedField, visitor
PreOrderSchemaVisitor[T]) T {
+ var fn func() T
+ switch typ := f.Type.(type) {
+ case *StructType:
+ fn = func() T { return visitStructPreOrder(*typ, visitor) }
+ case *ListType:
+ fn = func() T { return visitListPreOrder(*typ, visitor) }
+ case *MapType:
+ fn = func() T { return visitMapPreOrder(*typ, visitor) }
+ default:
+ fn = func() T { return visitor.Primitive(typ.(PrimitiveType)) }
+ }
+
+ return visitor.Field(f, fn)
+}
+
// IndexByID performs a post-order traversal of the given schema and
// returns a mapping from field ID to field.
func IndexByID(schema *Schema) (map[int]NestedField, error) {
@@ -1069,6 +1142,93 @@ func buildAccessors(schema *Schema) (map[int]accessor,
error) {
return Visit(schema, buildPosAccessors{})
}
+type setFreshIDs struct {
+ oldIdToNew map[int]int
+ nextIDFunc func() int
+}
+
+func (s *setFreshIDs) getAndInc(currentID int) int {
+ next := s.nextIDFunc()
+ s.oldIdToNew[currentID] = next
+ return next
+}
+
+func (s *setFreshIDs) Schema(_ *Schema, structResult func() Type) Type {
+ return structResult()
+}
+
+func (s *setFreshIDs) Struct(st StructType, fieldResults []func() Type) Type {
+ newFields := make([]NestedField, len(st.FieldList))
+ for idx, f := range st.FieldList {
+ newFields[idx] = NestedField{
+ ID: s.getAndInc(f.ID),
+ Name: f.Name,
+ Type: fieldResults[idx](),
+ Doc: f.Doc,
+ Required: f.Required,
+ }
+ }
+ return &StructType{FieldList: newFields}
+}
+
+func (s *setFreshIDs) Field(_ NestedField, fieldResult func() Type) Type {
+ return fieldResult()
+}
+
+func (s *setFreshIDs) List(list ListType, elemResult func() Type) Type {
+ elemID := s.getAndInc(list.ElementID)
+ return &ListType{
+ ElementID: elemID,
+ Element: elemResult(),
+ ElementRequired: list.ElementRequired,
+ }
+}
+
+func (s *setFreshIDs) Map(mapType MapType, keyResult, valueResult func() Type)
Type {
+ keyID := s.getAndInc(mapType.KeyID)
+ valueID := s.getAndInc(mapType.ValueID)
+ return &MapType{
+ KeyID: keyID,
+ ValueID: valueID,
+ KeyType: keyResult(),
+ ValueType: valueResult(),
+ ValueRequired: mapType.ValueRequired,
+ }
+}
+
+func (s *setFreshIDs) Primitive(p PrimitiveType) Type {
+ return p
+}
+
+// AssignFreshSchemaIDs creates a new schema with fresh field IDs for all of
the
+// fields in it. The nextID function is used to iteratively generate the ids,
if
+// it is nil then a simple incrementing counter is used starting at 1.
+func AssignFreshSchemaIDs(sc *Schema, nextID func() int) (*Schema, error) {
+ if nextID == nil {
+ var id int = 0
+ nextID = func() int {
+ id++
+ return id
+ }
+ }
+ visitor := &setFreshIDs{oldIdToNew: make(map[int]int), nextIDFunc:
nextID}
+ outType, err := PreOrderVisit(sc, visitor)
+ if err != nil {
+ return nil, err
+ }
+
+ fields := outType.(*StructType).FieldList
+ var newIdentifierIDs []int
+ if len(sc.IdentifierFieldIDs) != 0 {
+ newIdentifierIDs = make([]int, len(sc.IdentifierFieldIDs))
+ for i, id := range sc.IdentifierFieldIDs {
+ newIdentifierIDs[i] = visitor.oldIdToNew[id]
+ }
+ }
+
+ return NewSchemaWithIdentifiers(0, newIdentifierIDs, fields...), nil
+}
+
type SchemaWithPartnerVisitor[T, P any] interface {
Schema(sc *Schema, schemaPartner P, structResult T) T
Struct(st StructType, structPartner P, fieldResults []T) T
diff --git a/schema_test.go b/schema_test.go
index 4e8e746..d080b9f 100644
--- a/schema_test.go
+++ b/schema_test.go
@@ -641,6 +641,32 @@ func TestPruneNilSchema(t *testing.T) {
assert.ErrorIs(t, err, iceberg.ErrInvalidArgument)
}
+func TestAssignFreshSchemaIDs(t *testing.T) {
+ startID := 100
+ sc, err := iceberg.AssignFreshSchemaIDs(tableSchemaNested, func() int {
+ startID++
+ return startID
+ })
+ require.NoError(t, err)
+ require.NotNil(t, sc)
+
+ startID = 100
+ var checkID func(iceberg.NestedField)
+ checkID = func(f iceberg.NestedField) {
+ startID++
+ assert.Equal(t, startID, f.ID)
+ if nested, ok := f.Type.(iceberg.NestedType); ok {
+ for _, nf := range nested.Fields() {
+ checkID(nf)
+ }
+ }
+ }
+
+ for _, f := range sc.Fields() {
+ checkID(f)
+ }
+}
+
func TestSchemaRoundTrip(t *testing.T) {
data, err := json.Marshal(tableSchemaNested)
require.NoError(t, err)
diff --git a/table/metadata.go b/table/metadata.go
index 73021c5..bd4896d 100644
--- a/table/metadata.go
+++ b/table/metadata.go
@@ -25,6 +25,7 @@ import (
"iter"
"maps"
"slices"
+ "strconv"
"time"
"github.com/apache/iceberg-go"
@@ -1029,3 +1030,76 @@ func (m *metadataV2) UnmarshalJSON(b []byte) error {
m.preValidate()
return m.validate()
}
+
+const DefaultFormatVersion = 2
+
+// NewMetadata creates a new table metadata object using the provided schema,
information, generating a fresh UUID for
+// the new table metadata. By default, this will generate a V2 table metadata,
but this can be modified
+// by adding a "format-version" property to the props map. An error will be
returned if the "format-version"
+// property exists and is not a valid version number.
+func NewMetadata(sc *iceberg.Schema, partitions *iceberg.PartitionSpec,
sortOrder SortOrder, location string, props iceberg.Properties) (Metadata,
error) {
+ return NewMetadataWithUUID(sc, partitions, sortOrder, location, props,
uuid.Nil)
+}
+
+// NewMetadataWithUUID is like NewMetadata, but allows the caller to specify
the UUID of the table rather than creating a new one.
+func NewMetadataWithUUID(sc *iceberg.Schema, partitions
*iceberg.PartitionSpec, sortOrder SortOrder, location string, props
iceberg.Properties, tableUuid uuid.UUID) (Metadata, error) {
+ freshSchema, err := iceberg.AssignFreshSchemaIDs(sc, nil)
+ if err != nil {
+ return nil, err
+ }
+
+ freshPartitions, err := iceberg.AssignFreshPartitionSpecIDs(partitions,
sc, freshSchema)
+ if err != nil {
+ return nil, err
+ }
+
+ freshSortOrder, err := AssignFreshSortOrderIDs(sortOrder, sc,
freshSchema)
+ if err != nil {
+ return nil, err
+ }
+
+ if tableUuid == uuid.Nil {
+ tableUuid = uuid.New()
+ }
+
+ formatVersion := DefaultFormatVersion
+ if props != nil {
+ verStr, ok := props["format-version"]
+ if ok {
+ if formatVersion, err = strconv.Atoi(verStr); err !=
nil {
+ formatVersion = DefaultFormatVersion
+ }
+ delete(props, "format-version")
+ }
+ }
+
+ lastPartitionID := freshPartitions.LastAssignedFieldID()
+ common := commonMetadata{
+ LastUpdatedMS: time.Now().UnixMilli(),
+ LastColumnId: freshSchema.HighestFieldID(),
+ FormatVersion: formatVersion,
+ UUID: tableUuid,
+ Loc: location,
+ SchemaList: []*iceberg.Schema{freshSchema},
+ CurrentSchemaID: freshSchema.ID,
+ Specs: []iceberg.PartitionSpec{freshPartitions},
+ DefaultSpecID: freshPartitions.ID(),
+ LastPartitionID: &lastPartitionID,
+ Props: props,
+ SortOrderList: []SortOrder{freshSortOrder},
+ DefaultSortOrderID: freshSortOrder.OrderID,
+ }
+
+ switch formatVersion {
+ case 1:
+ return &metadataV1{
+ commonMetadata: common,
+ Schema: freshSchema,
+ Partition:
slices.Collect(freshPartitions.Fields()),
+ }, nil
+ case 2:
+ return &metadataV2{commonMetadata: common}, nil
+ default:
+ return nil, fmt.Errorf("invalid format version: %d",
formatVersion)
+ }
+}
diff --git a/table/metadata_internal_test.go b/table/metadata_internal_test.go
index a02ac7f..44aa037 100644
--- a/table/metadata_internal_test.go
+++ b/table/metadata_internal_test.go
@@ -491,3 +491,118 @@ func TestV1WriteMetadataToV2(t *testing.T) {
assert.NotContains(t, rawData, "schema")
assert.NotContains(t, rawData, "partition-spec")
}
+
+func TestNewMetadataWithExplicitV1Format(t *testing.T) {
+ schema := iceberg.NewSchemaWithIdentifiers(10,
+ []int{22},
+ iceberg.NestedField{ID: 10, Name: "foo", Type:
iceberg.PrimitiveTypes.String, Required: false},
+ iceberg.NestedField{ID: 22, Name: "bar", Type:
iceberg.PrimitiveTypes.Int32, Required: true},
+ iceberg.NestedField{ID: 33, Name: "baz", Type:
iceberg.PrimitiveTypes.Bool, Required: false},
+ )
+
+ partitionSpec := iceberg.NewPartitionSpecID(10,
+ iceberg.PartitionField{SourceID: 22, FieldID: 1022, Transform:
iceberg.IdentityTransform{}, Name: "bar"})
+
+ sortOrder := SortOrder{
+ OrderID: 10,
+ Fields: []SortField{{
+ SourceID: 10,
+ Transform: iceberg.IdentityTransform{},
+ Direction: SortASC, NullOrder: NullsLast}}}
+
+ actual, err := NewMetadata(schema, &partitionSpec, sortOrder,
"s3://some_v1_location/", iceberg.Properties{"format-version": "1"})
+ require.NoError(t, err)
+
+ expectedSchema := iceberg.NewSchemaWithIdentifiers(0, []int{2},
+ iceberg.NestedField{ID: 1, Name: "foo", Type:
iceberg.PrimitiveTypes.String},
+ iceberg.NestedField{ID: 2, Name: "bar", Type:
iceberg.PrimitiveTypes.Int32, Required: true},
+ iceberg.NestedField{ID: 3, Name: "baz", Type:
iceberg.PrimitiveTypes.Bool})
+
+ expectedSpec := iceberg.NewPartitionSpec(
+ iceberg.PartitionField{SourceID: 2, FieldID: 1000, Transform:
iceberg.IdentityTransform{}, Name: "bar"})
+
+ expectedSortOrder := SortOrder{
+ OrderID: 1,
+ Fields: []SortField{{
+ SourceID: 1, Transform: iceberg.IdentityTransform{},
+ Direction: SortASC, NullOrder: NullsLast}}}
+
+ lastPartitionID := 1000
+ expected := &metadataV1{
+ commonMetadata: commonMetadata{
+ Loc: "s3://some_v1_location/",
+ UUID: actual.TableUUID(),
+ LastUpdatedMS: actual.LastUpdatedMillis(),
+ LastColumnId: 3,
+ SchemaList: []*iceberg.Schema{expectedSchema},
+ CurrentSchemaID: 0,
+ Specs:
[]iceberg.PartitionSpec{expectedSpec},
+ DefaultSpecID: 0,
+ LastPartitionID: &lastPartitionID,
+ SortOrderList: []SortOrder{expectedSortOrder},
+ DefaultSortOrderID: 1,
+ FormatVersion: 1,
+ },
+ Schema: expectedSchema,
+ Partition: slices.Collect(expectedSpec.Fields()),
+ }
+
+ assert.Truef(t, expected.Equals(actual), "expected: %s\ngot: %s",
expected, actual)
+}
+
+func TestNewMetadataV2Format(t *testing.T) {
+ schema := iceberg.NewSchemaWithIdentifiers(10,
+ []int{22},
+ iceberg.NestedField{ID: 10, Name: "foo", Type:
iceberg.PrimitiveTypes.String, Required: false},
+ iceberg.NestedField{ID: 22, Name: "bar", Type:
iceberg.PrimitiveTypes.Int32, Required: true},
+ iceberg.NestedField{ID: 33, Name: "baz", Type:
iceberg.PrimitiveTypes.Bool, Required: false},
+ )
+
+ partitionSpec := iceberg.NewPartitionSpecID(10,
+ iceberg.PartitionField{SourceID: 22, FieldID: 1022, Transform:
iceberg.IdentityTransform{}, Name: "bar"})
+
+ sortOrder := SortOrder{
+ OrderID: 10,
+ Fields: []SortField{{
+ SourceID: 10,
+ Transform: iceberg.IdentityTransform{},
+ Direction: SortASC, NullOrder: NullsLast}}}
+
+ tableUUID := uuid.New()
+
+ actual, err := NewMetadataWithUUID(schema, &partitionSpec, sortOrder,
"s3://some_v1_location/", nil, tableUUID)
+ require.NoError(t, err)
+
+ expectedSchema := iceberg.NewSchemaWithIdentifiers(0, []int{2},
+ iceberg.NestedField{ID: 1, Name: "foo", Type:
iceberg.PrimitiveTypes.String},
+ iceberg.NestedField{ID: 2, Name: "bar", Type:
iceberg.PrimitiveTypes.Int32, Required: true},
+ iceberg.NestedField{ID: 3, Name: "baz", Type:
iceberg.PrimitiveTypes.Bool})
+
+ expectedSpec := iceberg.NewPartitionSpec(
+ iceberg.PartitionField{SourceID: 2, FieldID: 1000, Transform:
iceberg.IdentityTransform{}, Name: "bar"})
+
+ expectedSortOrder := SortOrder{
+ OrderID: 1,
+ Fields: []SortField{{
+ SourceID: 1, Transform: iceberg.IdentityTransform{},
+ Direction: SortASC, NullOrder: NullsLast}}}
+
+ lastPartitionID := 1000
+ expected := &metadataV2{
+ commonMetadata: commonMetadata{
+ Loc: "s3://some_v1_location/",
+ UUID: tableUUID,
+ LastUpdatedMS: actual.LastUpdatedMillis(),
+ LastColumnId: 3,
+ SchemaList: []*iceberg.Schema{expectedSchema},
+ CurrentSchemaID: 0,
+ Specs:
[]iceberg.PartitionSpec{expectedSpec},
+ DefaultSpecID: 0,
+ LastPartitionID: &lastPartitionID,
+ SortOrderList: []SortOrder{expectedSortOrder},
+ DefaultSortOrderID: 1,
+ FormatVersion: 2,
+ }}
+
+ assert.Truef(t, expected.Equals(actual), "expected: %s\ngot: %s",
expected, actual)
+}
diff --git a/table/sorting.go b/table/sorting.go
index 425a92e..e510439 100644
--- a/table/sorting.go
+++ b/table/sorting.go
@@ -175,3 +175,38 @@ func (s *SortOrder) UnmarshalJSON(b []byte) error {
return nil
}
+
+// AssignFreshSortOrderIDs updates and reassigns the field source IDs from the
old schema
+// to the corresponding fields in the fresh schema, while also giving the Sort
Order a fresh
+// ID of 0 (the initial Sort Order ID).
+func AssignFreshSortOrderIDs(sortOrder SortOrder, old, fresh *iceberg.Schema)
(SortOrder, error) {
+ return AssignFreshSortOrderIDsWithID(sortOrder, old, fresh,
InitialSortOrderID)
+}
+
+// AssignFreshSortOrderIDsWithID is like AssignFreshSortOrderIDs but allows
specifying the id of the
+// returned SortOrder.
+func AssignFreshSortOrderIDsWithID(sortOrder SortOrder, old, fresh
*iceberg.Schema, sortOrderID int) (SortOrder, error) {
+ if sortOrder.Equals(UnsortedSortOrder) {
+ return UnsortedSortOrder, nil
+ }
+
+ fields := make([]SortField, 0, len(sortOrder.Fields))
+ for _, field := range sortOrder.Fields {
+ originalField, ok := old.FindColumnName(field.SourceID)
+ if !ok {
+ return SortOrder{}, fmt.Errorf("cannot find source
column id %s in old schema", field.String())
+ }
+ freshField, ok := fresh.FindFieldByName(originalField)
+ if !ok {
+ return SortOrder{}, fmt.Errorf("cannot find field %s in
fresh schema", originalField)
+ }
+
+ fields = append(fields, SortField{
+ SourceID: freshField.ID,
+ Transform: field.Transform,
+ Direction: field.Direction,
+ NullOrder: field.NullOrder,
+ })
+ }
+ return SortOrder{OrderID: sortOrderID, Fields: fields}, nil
+}