This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 9c5f3e4  feat: Impl rest catalog + table updates & requirements
9c5f3e4 is described below

commit 9c5f3e4011dbba41d4f147a0a7fd656c77007d23
Author: jwtryg <[email protected]>
AuthorDate: Tue Jan 7 22:39:57 2025 +0100

    feat: Impl rest catalog + table updates & requirements
    
    Implement remaining REST catalog functions and basic framework for table 
updates/requirements.
---
 catalog/catalog.go                                 |   3 +-
 catalog/rest.go                                    | 265 +++++++--
 catalog/rest_test.go                               | 100 ++++
 go.mod                                             |   3 +-
 go.sum                                             |   8 +-
 partitions.go                                      |   6 +
 table/metadata.go                                  | 618 ++++++++++++++++++++-
 ...{metadata_test.go => metadata_internal_test.go} |  61 +-
 table/requirements.go                              | 267 +++++++++
 table/table_test.go                                |   4 +-
 table/updates.go                                   | 379 +++++++++++++
 11 files changed, 1615 insertions(+), 99 deletions(-)

diff --git a/catalog/catalog.go b/catalog/catalog.go
index 65da7e5..9b5776d 100644
--- a/catalog/catalog.go
+++ b/catalog/catalog.go
@@ -45,6 +45,7 @@ var (
        ErrNoSuchTable            = errors.New("table does not exist")
        ErrNoSuchNamespace        = errors.New("namespace does not exist")
        ErrNamespaceAlreadyExists = errors.New("namespace already exists")
+       ErrTableAlreadyExists     = errors.New("table already exists")
 )
 
 // WithAwsConfig sets the AWS configuration for the catalog.
@@ -156,7 +157,7 @@ type Catalog interface {
        ListTables(ctx context.Context, namespace table.Identifier) 
([]table.Identifier, error)
        // LoadTable loads a table from the catalog and returns a Table with 
the metadata.
        LoadTable(ctx context.Context, identifier table.Identifier, props 
iceberg.Properties) (*table.Table, error)
-       // DropTable tells the catalog to drop the table entirely
+       // DropTable tells the catalog to drop the table entirely.
        DropTable(ctx context.Context, identifier table.Identifier) error
        // RenameTable tells the catalog to rename a given table by the 
identifiers
        // provided, and then loads and returns the destination table
diff --git a/catalog/rest.go b/catalog/rest.go
index ef9c332..47be1eb 100644
--- a/catalog/rest.go
+++ b/catalog/rest.go
@@ -84,6 +84,86 @@ func (e errorResponse) Error() string {
        return e.Type + ": " + e.Message
 }
 
+type identifier struct {
+       Namespace []string `json:"namespace"`
+       Name      string   `json:"name"`
+}
+
+type commitTableResponse struct {
+       MetadataLoc string          `json:"metadata-location"`
+       RawMetadata json.RawMessage `json:"metadata"`
+       Metadata    table.Metadata  `json:"-"`
+}
+
+func (t *commitTableResponse) UnmarshalJSON(b []byte) (err error) {
+       type Alias commitTableResponse
+       if err = json.Unmarshal(b, (*Alias)(t)); err != nil {
+               return err
+       }
+
+       t.Metadata, err = table.ParseMetadataBytes(t.RawMetadata)
+       return
+}
+
+type loadTableResponse struct {
+       MetadataLoc string             `json:"metadata-location"`
+       RawMetadata json.RawMessage    `json:"metadata"`
+       Config      iceberg.Properties `json:"config"`
+       Metadata    table.Metadata     `json:"-"`
+}
+
+func (t *loadTableResponse) UnmarshalJSON(b []byte) (err error) {
+       type Alias loadTableResponse
+       if err = json.Unmarshal(b, (*Alias)(t)); err != nil {
+               return err
+       }
+
+       t.Metadata, err = table.ParseMetadataBytes(t.RawMetadata)
+       return
+}
+
+type createTableOption func(*createTableRequest)
+
+func WithLocation(loc string) createTableOption {
+       return func(req *createTableRequest) {
+               req.Location = strings.TrimRight(loc, "/")
+       }
+}
+
+func WithPartitionSpec(spec *iceberg.PartitionSpec) createTableOption {
+       return func(req *createTableRequest) {
+               req.PartitionSpec = spec
+       }
+}
+
+func WithWriteOrder(order *table.SortOrder) createTableOption {
+       return func(req *createTableRequest) {
+               req.WriteOrder = order
+       }
+}
+
+func WithStageCreate() createTableOption {
+       return func(req *createTableRequest) {
+               req.StageCreate = true
+       }
+}
+
+func WithProperties(props iceberg.Properties) createTableOption {
+       return func(req *createTableRequest) {
+               req.Props = props
+       }
+}
+
+type createTableRequest struct {
+       Name          string                 `json:"name"`
+       Schema        *iceberg.Schema        `json:"schema"`
+       Location      string                 `json:"location,omitempty"`
+       PartitionSpec *iceberg.PartitionSpec `json:"partition-spec,omitempty"`
+       WriteOrder    *table.SortOrder       `json:"write-order,omitempty"`
+       StageCreate   bool                   `json:"stage-create,omitempty"`
+       Props         iceberg.Properties     `json:"properties,omitempty"`
+}
+
 type oauthTokenResponse struct {
        AccessToken  string `json:"access_token"`
        TokenType    string `json:"token_type"`
@@ -537,6 +617,20 @@ func checkValidNamespace(ident table.Identifier) error {
        return nil
 }
 
+func (r *RestCatalog) tableFromResponse(identifier []string, metadata 
table.Metadata, loc string, config iceberg.Properties) (*table.Table, error) {
+       id := identifier
+       if r.name != "" {
+               id = append([]string{r.name}, identifier...)
+       }
+
+       iofs, err := iceio.LoadFS(config, loc)
+       if err != nil {
+               return nil, err
+       }
+
+       return table.New(id, metadata, loc, iofs), nil
+}
+
 func (r *RestCatalog) ListTables(ctx context.Context, namespace 
table.Identifier) ([]table.Identifier, error) {
        if err := checkValidNamespace(namespace); err != nil {
                return nil, err
@@ -546,12 +640,8 @@ func (r *RestCatalog) ListTables(ctx context.Context, 
namespace table.Identifier
        path := []string{"namespaces", ns, "tables"}
 
        type resp struct {
-               Identifiers []struct {
-                       Namespace []string `json:"namespace"`
-                       Name      string   `json:"name"`
-               } `json:"identifiers"`
+               Identifiers []identifier `json:"identifiers"`
        }
-
        rsp, err := doGet[resp](ctx, r.baseURI, path, r.cl, 
map[int]error{http.StatusNotFound: ErrNoSuchNamespace})
        if err != nil {
                return nil, err
@@ -573,64 +663,156 @@ func splitIdentForPath(ident table.Identifier) (string, 
string, error) {
        return strings.Join(NamespaceFromIdent(ident), namespaceSeparator), 
TableNameFromIdent(ident), nil
 }
 
-type tblResponse struct {
-       MetadataLoc string             `json:"metadata-location"`
-       RawMetadata json.RawMessage    `json:"metadata"`
-       Config      iceberg.Properties `json:"config"`
-       Metadata    table.Metadata     `json:"-"`
-}
+func (r *RestCatalog) CreateTable(ctx context.Context, identifier 
table.Identifier, schema *iceberg.Schema, opts ...createTableOption) 
(*table.Table, error) {
+       ns, tbl, err := splitIdentForPath(identifier)
+       if err != nil {
+               return nil, err
+       }
 
-func (t *tblResponse) UnmarshalJSON(b []byte) (err error) {
-       type Alias tblResponse
-       if err = json.Unmarshal(b, (*Alias)(t)); err != nil {
-               return err
+       payload := createTableRequest{
+               Name:   tbl,
+               Schema: schema,
+       }
+       for _, o := range opts {
+               o(&payload)
        }
 
-       t.Metadata, err = table.ParseMetadataBytes(t.RawMetadata)
-       return
+       ret, err := doPost[createTableRequest, loadTableResponse](ctx, 
r.baseURI, []string{"namespaces", ns, "tables"}, payload,
+               r.cl, map[int]error{http.StatusNotFound: ErrNoSuchNamespace, 
http.StatusConflict: ErrTableAlreadyExists})
+       if err != nil {
+               return nil, err
+       }
+
+       config := maps.Clone(r.props)
+       maps.Copy(config, ret.Metadata.Properties())
+       maps.Copy(config, ret.Config)
+
+       return r.tableFromResponse(identifier, ret.Metadata, ret.MetadataLoc, 
config)
 }
 
-func (r *RestCatalog) LoadTable(ctx context.Context, identifier 
table.Identifier, props iceberg.Properties) (*table.Table, error) {
+func (r *RestCatalog) RegisterTable(ctx context.Context, identifier 
table.Identifier, metadataLoc string) (*table.Table, error) {
        ns, tbl, err := splitIdentForPath(identifier)
        if err != nil {
                return nil, err
        }
 
-       if props == nil {
-               props = iceberg.Properties{}
+       type payload struct {
+               Name        string `json:"name"`
+               MetadataLoc string `json:"metadata-location"`
        }
 
-       ret, err := doGet[tblResponse](ctx, r.baseURI, []string{"namespaces", 
ns, "tables", tbl},
-               r.cl, map[int]error{http.StatusNotFound: ErrNoSuchTable})
+       ret, err := doPost[payload, loadTableResponse](ctx, r.baseURI, 
[]string{"namespaces", ns, "tables", tbl},
+               payload{Name: tbl, MetadataLoc: metadataLoc}, r.cl, 
map[int]error{http.StatusNotFound: ErrNoSuchNamespace, http.StatusConflict: 
ErrTableAlreadyExists})
        if err != nil {
                return nil, err
        }
 
-       id := identifier
-       if r.name != "" {
-               id = append([]string{r.name}, identifier...)
+       config := maps.Clone(r.props)
+       maps.Copy(config, ret.Metadata.Properties())
+       maps.Copy(config, ret.Config)
+       return r.tableFromResponse(identifier, ret.Metadata, ret.MetadataLoc, 
config)
+}
+
+func (r *RestCatalog) LoadTable(ctx context.Context, identifier 
table.Identifier, props iceberg.Properties) (*table.Table, error) {
+       ns, tbl, err := splitIdentForPath(identifier)
+       if err != nil {
+               return nil, err
        }
 
-       tblProps := maps.Clone(r.props)
-       maps.Copy(tblProps, props)
-       maps.Copy(tblProps, ret.Metadata.Properties())
+       ret, err := doGet[loadTableResponse](ctx, r.baseURI, 
[]string{"namespaces", ns, "tables", tbl},
+               r.cl, map[int]error{http.StatusNotFound: ErrNoSuchTable})
+       if err != nil {
+               return nil, err
+       }
+
+       config := maps.Clone(r.props)
+       maps.Copy(config, props)
+       maps.Copy(config, ret.Metadata.Properties())
        for k, v := range ret.Config {
-               tblProps[k] = v
+               config[k] = v
        }
 
-       iofs, err := iceio.LoadFS(tblProps, ret.MetadataLoc)
+       return r.tableFromResponse(identifier, ret.Metadata, ret.MetadataLoc, 
config)
+}
+
+func (r *RestCatalog) UpdateTable(ctx context.Context, ident table.Identifier, 
requirements []table.Requirement, updates []table.Update) (*table.Table, error) 
{
+       ns, tbl, err := splitIdentForPath(ident)
        if err != nil {
                return nil, err
        }
-       return table.New(id, ret.Metadata, ret.MetadataLoc, iofs), nil
+
+       restIdentifier := identifier{
+               Namespace: NamespaceFromIdent(ident),
+               Name:      tbl,
+       }
+       type payload struct {
+               Identifier   identifier          `json:"identifier"`
+               Requirements []table.Requirement `json:"requirements"`
+               Updates      []table.Update      `json:"updates"`
+       }
+       ret, err := doPost[payload, commitTableResponse](ctx, r.baseURI, 
[]string{"namespaces", ns, "tables", tbl},
+               payload{Identifier: restIdentifier, Requirements: requirements, 
Updates: updates}, r.cl,
+               map[int]error{http.StatusNotFound: ErrNoSuchTable, 
http.StatusConflict: ErrCommitFailed})
+       if err != nil {
+               return nil, err
+       }
+
+       config := maps.Clone(r.props)
+       maps.Copy(config, ret.Metadata.Properties())
+
+       return r.tableFromResponse(ident, ret.Metadata, ret.MetadataLoc, config)
 }
 
 func (r *RestCatalog) DropTable(ctx context.Context, identifier 
table.Identifier) error {
-       return fmt.Errorf("%w: [Rest Catalog] drop table", 
iceberg.ErrNotImplemented)
+       ns, tbl, err := splitIdentForPath(identifier)
+       if err != nil {
+               return err
+       }
+
+       _, err = doDelete[struct{}](ctx, r.baseURI, []string{"namespaces", ns, 
"tables", tbl}, r.cl,
+               map[int]error{http.StatusNotFound: ErrNoSuchTable})
+
+       return err
+}
+
+func (r *RestCatalog) PurgeTable(ctx context.Context, identifier 
table.Identifier) error {
+       ns, tbl, err := splitIdentForPath(identifier)
+       if err != nil {
+               return err
+       }
+
+       uri := r.baseURI.JoinPath("namespaces", ns, "tables", tbl)
+       v := url.Values{}
+       v.Set("purgeRequested", "true")
+       uri.RawQuery = v.Encode()
+
+       _, err = doDelete[struct{}](ctx, uri, []string{}, r.cl,
+               map[int]error{http.StatusNotFound: ErrNoSuchTable})
+
+       return err
 }
 
 func (r *RestCatalog) RenameTable(ctx context.Context, from, to 
table.Identifier) (*table.Table, error) {
-       return nil, fmt.Errorf("%w: [Rest Catalog] rename table", 
iceberg.ErrNotImplemented)
+       type payload struct {
+               From identifier `json:"from"`
+               To   identifier `json:"to"`
+       }
+       f := identifier{
+               Namespace: NamespaceFromIdent(from),
+               Name:      TableNameFromIdent(from),
+       }
+       t := identifier{
+               Namespace: NamespaceFromIdent(to),
+               Name:      TableNameFromIdent(to),
+       }
+
+       _, err := doPost[payload, any](ctx, r.baseURI, []string{"tables", 
"rename"}, payload{From: f, To: t}, r.cl,
+               map[int]error{http.StatusNotFound: ErrNoSuchTable})
+       if err != nil {
+               return nil, err
+       }
+
+       return r.LoadTable(ctx, to, nil)
 }
 
 func (r *RestCatalog) CreateNamespace(ctx context.Context, namespace 
table.Identifier, props iceberg.Properties) error {
@@ -710,3 +892,20 @@ func (r *RestCatalog) UpdateNamespaceProperties(ctx 
context.Context, namespace t
        return doPost[payload, PropertiesUpdateSummary](ctx, r.baseURI, 
[]string{"namespaces", ns, "properties"},
                payload{Remove: removals, Updates: updates}, r.cl, 
map[int]error{http.StatusNotFound: ErrNoSuchNamespace})
 }
+
+func (r *RestCatalog) CheckNamespaceExists(ctx context.Context, namespace 
table.Identifier) (bool, error) {
+       if err := checkValidNamespace(namespace); err != nil {
+               return false, err
+       }
+
+       _, err := doGet[struct{}](ctx, r.baseURI, []string{"namespaces", 
strings.Join(namespace, namespaceSeparator)},
+               r.cl, map[int]error{http.StatusNotFound: ErrNoSuchNamespace})
+       if err != nil {
+               if errors.Is(err, ErrNoSuchNamespace) {
+                       return false, nil
+               }
+               return false, err
+       }
+
+       return true, nil
+}
diff --git a/catalog/rest_test.go b/catalog/rest_test.go
index 618c5e0..212d5a8 100644
--- a/catalog/rest_test.go
+++ b/catalog/rest_test.go
@@ -22,6 +22,7 @@ import (
        "crypto/tls"
        "crypto/x509"
        "encoding/json"
+       "fmt"
        "net/http"
        "net/http/httptest"
        "net/url"
@@ -624,6 +625,105 @@ func (r *RestCatalogSuite) TestUpdateNamespaceProps404() {
        r.ErrorContains(err, "Namespace does not exist: does_not_exist in 
warehouse")
 }
 
+var (
+       exampleTableMetadataNoSnapshotV1 = `{
+       "format-version": 1,
+       "table-uuid": "bf289591-dcc0-4234-ad4f-5c3eed811a29",
+       "location": "s3://warehouse/database/table",
+       "last-updated-ms": 1657810967051,
+       "last-column-id": 3,
+       "schema": {
+               "type": "struct",
+               "schema-id": 0,
+               "identifier-field-ids": [2],
+               "fields": [
+                       {"id": 1, "name": "foo", "required": false, "type": 
"string"},
+                       {"id": 2, "name": "bar", "required": true, "type": 
"int"},
+                       {"id": 3, "name": "baz", "required": false, "type": 
"boolean"}
+               ]
+       },
+       "current-schema-id": 0,
+       "schemas": [
+               {
+                       "type": "struct",
+                       "schema-id": 0,
+                       "identifier-field-ids": [2],
+                       "fields": [
+                               {"id": 1, "name": "foo", "required": false, 
"type": "string"},
+                               {"id": 2, "name": "bar", "required": true, 
"type": "int"},
+                               {"id": 3, "name": "baz", "required": false, 
"type": "boolean"}
+                       ]
+               }
+       ],
+       "partition-spec": [],
+       "default-spec-id": 0,
+       "last-partition-id": 999,
+       "default-sort-order-id": 0,
+       "sort-orders": [{"order-id": 0, "fields": []}],
+       "properties": {
+               "write.delete.parquet.compression-codec": "zstd",
+               "write.metadata.compression-codec": "gzip",
+               "write.summary.partition-limit": "100",
+               "write.parquet.compression-codec": "zstd"
+       },
+       "current-snapshot-id": -1,
+       "refs": {},
+       "snapshots": [],
+       "snapshot-log": [],
+       "metadata-log": []
+}`
+
+       createTableRestExample = fmt.Sprintf(`{
+       "metadata-location": "s3://warehouse/database/table/metadata.json",
+       "metadata": %s,
+       "config": {
+               "client.factory": 
"io.tabular.iceberg.catalog.TabularAwsClientFactory",
+               "region": "us-west-2"
+       }
+}`, exampleTableMetadataNoSnapshotV1)
+
+       tableSchemaSimple = iceberg.NewSchemaWithIdentifiers(1, []int{2},
+               iceberg.NestedField{ID: 1, Name: "foo", Type: 
iceberg.StringType{}, Required: false},
+               iceberg.NestedField{ID: 2, Name: "bar", Type: 
iceberg.PrimitiveTypes.Int32, Required: true},
+               iceberg.NestedField{ID: 3, Name: "baz", Type: 
iceberg.PrimitiveTypes.Bool, Required: false},
+       )
+)
+
+func (r *RestCatalogSuite) TestCreateTable200() {
+       r.mux.HandleFunc("/v1/namespaces/fokko/tables", func(w 
http.ResponseWriter, req *http.Request) {
+               r.Require().Equal(http.MethodPost, req.Method)
+
+               for k, v := range TestHeaders {
+                       r.Equal(v, req.Header.Values(k))
+               }
+
+               w.Write([]byte(createTableRestExample))
+       })
+
+       t := createTableRestExample
+       _ = t
+       cat, err := catalog.NewRestCatalog("rest", r.srv.URL, 
catalog.WithOAuthToken(TestToken))
+       r.Require().NoError(err)
+
+       tbl, err := cat.CreateTable(
+               context.Background(),
+               catalog.ToRestIdentifier("fokko", "fokko2"),
+               tableSchemaSimple,
+       )
+       r.Require().NoError(err)
+
+       r.Equal(catalog.ToRestIdentifier("rest", "fokko", "fokko2"), 
tbl.Identifier())
+       r.Equal("s3://warehouse/database/table/metadata.json", 
tbl.MetadataLocation())
+       r.EqualValues(1, tbl.Metadata().Version())
+       r.Equal("bf289591-dcc0-4234-ad4f-5c3eed811a29", 
tbl.Metadata().TableUUID().String())
+       r.EqualValues(1657810967051, tbl.Metadata().LastUpdatedMillis())
+       r.Equal(3, tbl.Metadata().LastColumnID())
+       r.Zero(tbl.Schema().ID)
+       r.Zero(tbl.Metadata().DefaultPartitionSpec())
+       r.Equal(999, *tbl.Metadata().LastPartitionSpecID())
+       r.Equal(table.UnsortedSortOrder, tbl.SortOrder())
+}
+
 func (r *RestCatalogSuite) TestLoadTable200() {
        r.mux.HandleFunc("/v1/namespaces/fokko/tables/table", func(w 
http.ResponseWriter, req *http.Request) {
                r.Require().Equal(http.MethodGet, req.Method)
diff --git a/go.mod b/go.mod
index eb42b05..0c3f744 100644
--- a/go.mod
+++ b/go.mod
@@ -104,7 +104,8 @@ require (
        github.com/modern-go/reflect2 v1.0.2 // indirect
        github.com/pierrec/lz4/v4 v4.1.21 // indirect
        github.com/pmezard/go-difflib v1.0.0 // indirect
-       github.com/rivo/uniseg v0.4.4 // indirect
+       github.com/rivo/uniseg v0.4.7 // indirect
+       github.com/rogpeppe/go-internal v1.12.0 // indirect
        github.com/stretchr/objx v0.5.2 // indirect
        github.com/substrait-io/substrait v0.57.1 // indirect
        github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect
diff --git a/go.sum b/go.sum
index 76872a9..2ecff2e 100644
--- a/go.sum
+++ b/go.sum
@@ -238,10 +238,10 @@ github.com/pterm/pterm v0.12.40/go.mod 
h1:ffwPLwlbXxP+rxT0GsgDTzS3y3rmpAO1NMjUkG
 github.com/pterm/pterm v0.12.80 h1:mM55B+GnKUnLMUSqhdINe4s6tOuVQIetQ3my8JGyAIg=
 github.com/pterm/pterm v0.12.80/go.mod 
h1:c6DeF9bSnOSeFPZlfs4ZRAFcf5SCoTwvwQ5xaKGQlHo=
 github.com/rivo/uniseg v0.2.0/go.mod 
h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
-github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis=
-github.com/rivo/uniseg v0.4.4/go.mod 
h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
-github.com/rogpeppe/go-internal v1.9.0 
h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
-github.com/rogpeppe/go-internal v1.9.0/go.mod 
h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
+github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
+github.com/rivo/uniseg v0.4.7/go.mod 
h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
+github.com/rogpeppe/go-internal v1.12.0 
h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
+github.com/rogpeppe/go-internal v1.12.0/go.mod 
h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
 github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ=
 github.com/sergi/go-diff v1.2.0/go.mod 
h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
 github.com/stretchr/objx v0.1.0/go.mod 
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
diff --git a/partitions.go b/partitions.go
index 48b730c..9416d70 100644
--- a/partitions.go
+++ b/partitions.go
@@ -20,6 +20,7 @@ package iceberg
 import (
        "encoding/json"
        "fmt"
+       "iter"
        "slices"
        "strings"
 )
@@ -116,6 +117,11 @@ func (ps PartitionSpec) Equals(other PartitionSpec) bool {
        return ps.id == other.id && slices.Equal(ps.fields, other.fields)
 }
 
+// Fields returns a clone of the partition fields in this spec.
+func (ps *PartitionSpec) Fields() iter.Seq[PartitionField] {
+       return slices.Values(ps.fields)
+}
+
 func (ps PartitionSpec) MarshalJSON() ([]byte, error) {
        if ps.fields == nil {
                ps.fields = []PartitionField{}
diff --git a/table/metadata.go b/table/metadata.go
index 47b3ffe..73021c5 100644
--- a/table/metadata.go
+++ b/table/metadata.go
@@ -22,14 +22,26 @@ import (
        "errors"
        "fmt"
        "io"
+       "iter"
        "maps"
        "slices"
+       "time"
 
        "github.com/apache/iceberg-go"
 
        "github.com/google/uuid"
 )
 
+const (
+       partitionFieldStartID       = 1000
+       supportedTableFormatVersion = 2
+
+       addPartionSpecAction = "add-partition-spec"
+       addSchemaAction      = "add-schema"
+       addSnapshotAction    = "add-snapshot"
+       addSortOrderAction   = "add-sort-order"
+)
+
 // Metadata for an iceberg table as specified in the Iceberg spec
 //
 // https://iceberg.apache.org/spec/#iceberg-table-spec
@@ -80,20 +92,542 @@ type Metadata interface {
        SnapshotByName(name string) *Snapshot
        // CurrentSnapshot returns the table's current snapshot.
        CurrentSnapshot() *Snapshot
+       // Ref returns the snapshot ref for the main branch.
+       Ref() SnapshotRef
+       // Refs returns a list of snapshot name/reference pairs.
+       Refs() iter.Seq2[string, SnapshotRef]
+       // SnapshotLogs returns the list of snapshot logs for the table.
+       SnapshotLogs() iter.Seq[SnapshotLogEntry]
        // SortOrder returns the table's current sort order, ie: the one with 
the
        // ID that matches the default-sort-order-id.
        SortOrder() SortOrder
        // SortOrders returns the list of sort orders in the table.
        SortOrders() []SortOrder
+       // DefaultSortOrder returns the ID of the current sort order that 
writers
+       // should use by default.
+       DefaultSortOrder() int
        // Properties is a string to string map of table properties. This is 
used
        // to control settings that affect reading and writing and is not 
intended
        // to be used for arbitrary metadata. For example, 
commit.retry.num-retries
        // is used to control the number of commit retries.
        Properties() iceberg.Properties
+       // PreviousFiles returns the list of metadata log entries for the table.
+       PreviousFiles() iter.Seq[MetadataLogEntry]
 
        Equals(Metadata) bool
 }
 
+type MetadataBuilder struct {
+       base    Metadata
+       updates []Update
+
+       // common fields
+       formatVersion      int
+       uuid               uuid.UUID
+       loc                string
+       lastUpdatedMS      int64
+       lastColumnId       int
+       schemaList         []*iceberg.Schema
+       currentSchemaID    int
+       specs              []iceberg.PartitionSpec
+       defaultSpecID      int
+       lastPartitionID    *int
+       props              iceberg.Properties
+       snapshotList       []Snapshot
+       currentSnapshotID  *int64
+       snapshotLog        []SnapshotLogEntry
+       metadataLog        []MetadataLogEntry
+       sortOrderList      []SortOrder
+       defaultSortOrderID int
+       refs               map[string]SnapshotRef
+
+       // V2 specific
+       lastSequenceNumber *int64
+}
+
+func NewMetadataBuilder() (*MetadataBuilder, error) {
+       return &MetadataBuilder{
+               updates:       make([]Update, 0),
+               schemaList:    make([]*iceberg.Schema, 0),
+               specs:         make([]iceberg.PartitionSpec, 0),
+               props:         make(iceberg.Properties),
+               snapshotList:  make([]Snapshot, 0),
+               snapshotLog:   make([]SnapshotLogEntry, 0),
+               metadataLog:   make([]MetadataLogEntry, 0),
+               sortOrderList: make([]SortOrder, 0),
+               refs:          make(map[string]SnapshotRef),
+       }, nil
+}
+
+func MetadataBuilderFromBase(metadata Metadata) (*MetadataBuilder, error) {
+       b := &MetadataBuilder{}
+       b.base = metadata
+
+       b.formatVersion = metadata.Version()
+       b.uuid = metadata.TableUUID()
+       b.loc = metadata.Location()
+       b.lastUpdatedMS = metadata.LastUpdatedMillis()
+       b.lastColumnId = metadata.LastColumnID()
+       b.schemaList = metadata.Schemas()
+       b.currentSchemaID = metadata.CurrentSchema().ID
+       b.specs = metadata.PartitionSpecs()
+       b.defaultSpecID = metadata.DefaultPartitionSpec()
+       b.lastPartitionID = metadata.LastPartitionSpecID()
+       b.props = metadata.Properties()
+       b.snapshotList = metadata.Snapshots()
+       b.currentSnapshotID = &metadata.CurrentSnapshot().SnapshotID
+       b.sortOrderList = metadata.SortOrders()
+       b.defaultSortOrderID = metadata.DefaultSortOrder()
+
+       b.refs = make(map[string]SnapshotRef)
+       for name, ref := range metadata.Refs() {
+               b.refs[name] = ref
+       }
+
+       b.snapshotLog = make([]SnapshotLogEntry, 0)
+       for log := range metadata.SnapshotLogs() {
+               b.snapshotLog = append(b.snapshotLog, log)
+       }
+
+       b.metadataLog = make([]MetadataLogEntry, 0)
+       for entry := range metadata.PreviousFiles() {
+               b.metadataLog = append(b.metadataLog, entry)
+       }
+
+       return b, nil
+}
+
+func (b *MetadataBuilder) AddSchema(schema *iceberg.Schema, newLastColumnID 
int, initial bool) (*MetadataBuilder, error) {
+       if newLastColumnID < b.lastColumnId {
+               return nil, fmt.Errorf("%w: newLastColumnID %d, must be >= %d", 
iceberg.ErrInvalidArgument, newLastColumnID, b.lastColumnId)
+       }
+
+       var schemas []*iceberg.Schema
+       if initial {
+               schemas = []*iceberg.Schema{schema}
+       } else {
+               schemas = append(b.schemaList, schema)
+       }
+
+       b.lastColumnId = newLastColumnID
+       b.schemaList = schemas
+       b.updates = append(b.updates, NewAddSchemaUpdate(schema, 
newLastColumnID, initial))
+
+       return b, nil
+}
+
+func (b *MetadataBuilder) AddPartitionSpec(spec *iceberg.PartitionSpec, 
initial bool) (*MetadataBuilder, error) {
+       for _, s := range b.specs {
+               if s.ID() == spec.ID() && !initial {
+                       return nil, fmt.Errorf("partition spec with id %d 
already exists", spec.ID())
+               }
+       }
+
+       maxFieldID := 0
+       for f := range spec.Fields() {
+               maxFieldID = max(maxFieldID, f.FieldID)
+       }
+
+       prev := partitionFieldStartID - 1
+       if b.lastPartitionID != nil {
+               prev = *b.lastPartitionID
+       }
+       lastPartitionID := max(maxFieldID, prev)
+
+       var specs []iceberg.PartitionSpec
+       if initial {
+               specs = []iceberg.PartitionSpec{*spec}
+       } else {
+               specs = append(b.specs, *spec)
+       }
+
+       b.specs = specs
+       b.lastPartitionID = &lastPartitionID
+       b.updates = append(b.updates, NewAddPartitionSpecUpdate(spec, initial))
+
+       return b, nil
+}
+
+func (b *MetadataBuilder) AddSnapshot(snapshot *Snapshot) (*MetadataBuilder, 
error) {
+       if snapshot == nil {
+               return nil, nil
+       }
+
+       if len(b.schemaList) == 0 {
+               return nil, errors.New("can't add snapshot with no added 
schemas")
+       } else if len(b.specs) == 0 {
+               return nil, errors.New("can't add snapshot with no added 
partition specs")
+       } else if s, _ := b.SnapshotByID(snapshot.SnapshotID); s != nil {
+               return nil, fmt.Errorf("can't add snapshot with id %d, already 
exists", snapshot.SnapshotID)
+       } else if b.formatVersion == 2 &&
+               snapshot.SequenceNumber > 0 &&
+               snapshot.SequenceNumber <= *b.lastSequenceNumber &&
+               snapshot.ParentSnapshotID != nil {
+               return nil, fmt.Errorf("can't add snapshot with sequence number 
%d, must be > than last sequence number %d",
+                       snapshot.SequenceNumber, b.lastSequenceNumber)
+       }
+
+       b.updates = append(b.updates, NewAddSnapshotUpdate(snapshot))
+       b.lastUpdatedMS = snapshot.TimestampMs
+       b.lastSequenceNumber = &snapshot.SequenceNumber
+       b.snapshotList = append(b.snapshotList, *snapshot)
+       return b, nil
+}
+
+func (b *MetadataBuilder) AddSortOrder(sortOrder *SortOrder, initial bool) 
(*MetadataBuilder, error) {
+       var sortOrders []SortOrder
+       if !initial {
+               sortOrders = append(sortOrders, b.sortOrderList...)
+       }
+
+       for _, s := range sortOrders {
+               if s.OrderID == sortOrder.OrderID {
+                       return nil, fmt.Errorf("sort order with id %d already 
exists", sortOrder.OrderID)
+               }
+       }
+
+       b.sortOrderList = append(sortOrders, *sortOrder)
+       b.updates = append(b.updates, NewAddSortOrderUpdate(sortOrder, initial))
+
+       return b, nil
+}
+
+func (b *MetadataBuilder) RemoveProperties(keys []string) (*MetadataBuilder, 
error) {
+       if len(keys) == 0 {
+               return b, nil
+       }
+
+       b.updates = append(b.updates, NewRemovePropertiesUpdate(keys))
+       for _, key := range keys {
+               delete(b.props, key)
+       }
+
+       return b, nil
+}
+
+func (b *MetadataBuilder) SetCurrentSchemaID(currentSchemaID int) 
(*MetadataBuilder, error) {
+       if currentSchemaID == -1 {
+               currentSchemaID = maxBy(b.schemaList, func(s *iceberg.Schema) 
int {
+                       return s.ID
+               })
+               if !slices.ContainsFunc(b.updates, func(u Update) bool {
+                       return u.Action() == addSchemaAction && 
u.(*addSchemaUpdate).Schema.ID == currentSchemaID
+               }) {
+                       return nil, errors.New("can't set current schema to 
last added schema, no schema has been added")
+               }
+       }
+
+       if currentSchemaID == b.currentSchemaID {
+               return b, nil
+       }
+
+       _, err := b.GetSchemaByID(currentSchemaID)
+       if err != nil {
+               return nil, fmt.Errorf("can't set current schema to schema with 
id %d: %w", currentSchemaID, err)
+       }
+
+       b.updates = append(b.updates, 
NewSetCurrentSchemaUpdate(currentSchemaID))
+       b.currentSchemaID = currentSchemaID
+       return b, nil
+}
+
+func (b *MetadataBuilder) SetDefaultSortOrderID(defaultSortOrderID int) 
(*MetadataBuilder, error) {
+       if defaultSortOrderID == -1 {
+               defaultSortOrderID = maxBy(b.sortOrderList, func(s SortOrder) 
int {
+                       return s.OrderID
+               })
+               if !slices.ContainsFunc(b.updates, func(u Update) bool {
+                       return u.Action() == addSortOrderAction && 
u.(*addSortOrderUpdate).SortOrder.OrderID == defaultSortOrderID
+               }) {
+                       return nil, fmt.Errorf("can't set default sort order to 
last added with no added sort orders")
+               }
+       }
+
+       if defaultSortOrderID == b.defaultSortOrderID {
+               return b, nil
+       }
+
+       if _, err := b.GetSortOrderByID(defaultSortOrderID); err != nil {
+               return nil, fmt.Errorf("can't set default sort order to sort 
order with id %d: %w", defaultSortOrderID, err)
+       }
+
+       b.updates = append(b.updates, 
NewSetDefaultSortOrderUpdate(defaultSortOrderID))
+       b.defaultSortOrderID = defaultSortOrderID
+       return b, nil
+}
+
+func (b *MetadataBuilder) SetDefaultSpecID(defaultSpecID int) 
(*MetadataBuilder, error) {
+       if defaultSpecID == -1 {
+               defaultSpecID = maxBy(b.specs, func(s iceberg.PartitionSpec) 
int {
+                       return s.ID()
+               })
+               if !slices.ContainsFunc(b.updates, func(u Update) bool {
+                       return u.Action() == addPartionSpecAction && 
u.(*addPartitionSpecUpdate).Spec.ID() == defaultSpecID
+               }) {
+                       return nil, fmt.Errorf("can't set default spec to last 
added with no added partition specs")
+               }
+       }
+
+       if defaultSpecID == b.defaultSpecID {
+               return b, nil
+       }
+
+       if _, err := b.GetSpecByID(defaultSpecID); err != nil {
+               return nil, fmt.Errorf("can't set default spec to spec with id 
%d: %w", defaultSpecID, err)
+       }
+
+       b.updates = append(b.updates, NewSetDefaultSpecUpdate(defaultSpecID))
+       b.defaultSpecID = defaultSpecID
+       return b, nil
+}
+
+func (b *MetadataBuilder) SetFormatVersion(formatVersion int) 
(*MetadataBuilder, error) {
+       if formatVersion < b.formatVersion {
+               return nil, fmt.Errorf("downgrading format version from %d to 
%d is not allowed",
+                       b.formatVersion, formatVersion)
+       }
+
+       if formatVersion > supportedTableFormatVersion {
+               return nil, fmt.Errorf("unsupported format version %d", 
formatVersion)
+       }
+
+       if formatVersion == b.formatVersion {
+               return b, nil
+       }
+
+       b.updates = append(b.updates, 
NewUpgradeFormatVersionUpdate(formatVersion))
+       b.formatVersion = formatVersion
+       return b, nil
+}
+
+func (b *MetadataBuilder) SetLoc(loc string) (*MetadataBuilder, error) {
+       if b.loc == loc {
+               return b, nil
+       }
+
+       b.updates = append(b.updates, NewSetLocationUpdate(loc))
+       b.loc = loc
+       return b, nil
+}
+
+func (b *MetadataBuilder) SetProperties(props iceberg.Properties) 
(*MetadataBuilder, error) {
+       if len(props) == 0 {
+               return b, nil
+       }
+
+       b.updates = append(b.updates, NewSetPropertiesUpdate(props))
+       maps.Copy(b.props, props)
+       return b, nil
+}
+
+type setSnapshotRefOption func(*SnapshotRef) error
+
+func WithMaxRefAgeMs(maxRefAgeMs int64) setSnapshotRefOption {
+       return func(ref *SnapshotRef) error {
+               if maxRefAgeMs <= 0 {
+                       return fmt.Errorf("%w: maxRefAgeMs %d, must be > 0", 
iceberg.ErrInvalidArgument, maxRefAgeMs)
+               }
+               ref.MaxRefAgeMs = &maxRefAgeMs
+               return nil
+       }
+}
+
+func WithMaxSnapshotAgeMs(maxSnapshotAgeMs int64) setSnapshotRefOption {
+       return func(ref *SnapshotRef) error {
+               if maxSnapshotAgeMs <= 0 {
+                       return fmt.Errorf("%w: maxSnapshotAgeMs %d, must be > 
0", iceberg.ErrInvalidArgument, maxSnapshotAgeMs)
+               }
+               ref.MaxSnapshotAgeMs = &maxSnapshotAgeMs
+               return nil
+       }
+}
+
+func WithMinSnapshotsToKeep(minSnapshotsToKeep int) setSnapshotRefOption {
+       return func(ref *SnapshotRef) error {
+               if minSnapshotsToKeep <= 0 {
+                       return fmt.Errorf("%w: minSnapshotsToKeep %d, must be > 
0", iceberg.ErrInvalidArgument, minSnapshotsToKeep)
+               }
+               ref.MinSnapshotsToKeep = &minSnapshotsToKeep
+               return nil
+       }
+}
+
+func (b *MetadataBuilder) SetSnapshotRef(
+       name string,
+       snapshotID int64,
+       refType RefType,
+       options ...setSnapshotRefOption,
+) (*MetadataBuilder, error) {
+       ref := SnapshotRef{
+               SnapshotID:      snapshotID,
+               SnapshotRefType: refType,
+       }
+       for _, opt := range options {
+               if err := opt(&ref); err != nil {
+                       return nil, fmt.Errorf("invalid snapshot ref option: 
%w", err)
+               }
+       }
+
+       var maxRefAgeMs, maxSnapshotAgeMs int64
+       var minSnapshotsToKeep int
+       if ref.MaxRefAgeMs != nil {
+               maxRefAgeMs = *ref.MaxRefAgeMs
+       }
+       if ref.MaxSnapshotAgeMs != nil {
+               maxSnapshotAgeMs = *ref.MaxSnapshotAgeMs
+       }
+       if ref.MinSnapshotsToKeep != nil {
+               minSnapshotsToKeep = *ref.MinSnapshotsToKeep
+       }
+
+       if existingRef, ok := b.refs[name]; ok && existingRef.Equals(ref) {
+               return b, nil
+       }
+
+       snapshot, err := b.SnapshotByID(snapshotID)
+       if err != nil {
+               return nil, fmt.Errorf("can't set snapshot ref %s to unknown 
snapshot %d: %w", name, snapshotID, err)
+       }
+
+       if refType == MainBranch {
+               b.updates = append(b.updates, NewSetSnapshotRefUpdate(name, 
snapshotID, refType, maxRefAgeMs, maxSnapshotAgeMs, minSnapshotsToKeep))
+               b.currentSnapshotID = &snapshotID
+               b.snapshotLog = append(b.snapshotLog, SnapshotLogEntry{
+                       SnapshotID:  snapshotID,
+                       TimestampMs: snapshot.TimestampMs,
+               })
+               b.lastUpdatedMS = time.Now().Local().UnixMilli()
+       }
+
+       if slices.ContainsFunc(b.updates, func(u Update) bool {
+               return u.Action() == addSnapshotAction && 
u.(*addSnapshotUpdate).Snapshot.SnapshotID == snapshotID
+       }) {
+               b.lastUpdatedMS = snapshot.TimestampMs
+       }
+
+       b.refs[name] = ref
+       return b, nil
+}
+
+func (b *MetadataBuilder) SetUUID(uuid uuid.UUID) (*MetadataBuilder, error) {
+       if b.uuid == uuid {
+               return b, nil
+       }
+
+       b.updates = append(b.updates, NewAssignUUIDUpdate(uuid))
+       b.uuid = uuid
+       return b, nil
+}
+
+func (b *MetadataBuilder) buildCommonMetadata() *commonMetadata {
+       return &commonMetadata{
+               FormatVersion:      b.formatVersion,
+               UUID:               b.uuid,
+               Loc:                b.loc,
+               LastUpdatedMS:      b.lastUpdatedMS,
+               LastColumnId:       b.lastColumnId,
+               SchemaList:         b.schemaList,
+               CurrentSchemaID:    b.currentSchemaID,
+               Specs:              b.specs,
+               DefaultSpecID:      b.defaultSpecID,
+               LastPartitionID:    b.lastPartitionID,
+               Props:              b.props,
+               SnapshotList:       b.snapshotList,
+               CurrentSnapshotID:  b.currentSnapshotID,
+               SnapshotLog:        b.snapshotLog,
+               MetadataLog:        b.metadataLog,
+               SortOrderList:      b.sortOrderList,
+               DefaultSortOrderID: b.defaultSortOrderID,
+               SnapshotRefs:       b.refs,
+       }
+}
+
+func (b *MetadataBuilder) GetSchemaByID(id int) (*iceberg.Schema, error) {
+       for _, s := range b.schemaList {
+               if s.ID == id {
+                       return s, nil
+               }
+       }
+
+       return nil, fmt.Errorf("%w: schema with id %d not found", 
iceberg.ErrInvalidArgument, id)
+}
+
+func (b *MetadataBuilder) GetSpecByID(id int) (*iceberg.PartitionSpec, error) {
+       for _, s := range b.specs {
+               if s.ID() == id {
+                       return &s, nil
+               }
+       }
+
+       return nil, fmt.Errorf("partition spec with id %d not found", id)
+}
+
+func (b *MetadataBuilder) GetSortOrderByID(id int) (*SortOrder, error) {
+       for _, s := range b.sortOrderList {
+               if s.OrderID == id {
+                       return &s, nil
+               }
+       }
+
+       return nil, fmt.Errorf("sort order with id %d not found", id)
+}
+
+func (b *MetadataBuilder) SnapshotByID(id int64) (*Snapshot, error) {
+       for _, s := range b.snapshotList {
+               if s.SnapshotID == id {
+                       return &s, nil
+               }
+       }
+
+       return nil, fmt.Errorf("snapshot with id %d not found", id)
+}
+
+func (b *MetadataBuilder) Build() (Metadata, error) {
+       common := b.buildCommonMetadata()
+       switch b.formatVersion {
+       case 1:
+               schema, err := b.GetSchemaByID(b.currentSchemaID)
+               if err != nil {
+                       return nil, fmt.Errorf("can't build metadata, missing 
schema for schema ID %d: %w", b.currentSchemaID, err)
+               }
+
+               partition, err := b.GetSpecByID(b.defaultSpecID)
+               if err != nil {
+                       return nil, fmt.Errorf("can't build metadata, missing 
partition spec for spec ID %d: %w", b.defaultSpecID, err)
+               }
+
+               partitionFields := make([]iceberg.PartitionField, 0)
+               for f := range partition.Fields() {
+                       partitionFields = append(partitionFields, f)
+               }
+
+               return &metadataV1{
+                       Schema:         schema,
+                       Partition:      partitionFields,
+                       commonMetadata: *common,
+               }, nil
+
+       case 2:
+               return &metadataV2{
+                       LastSequenceNumber: *b.lastSequenceNumber,
+                       commonMetadata:     *common,
+               }, nil
+
+       default:
+               panic("unreachable: invalid format version")
+       }
+}
+
+// maxBy returns the maximum value of extract(e) for all e in elems.
+// If elems is empty, returns 0.
+func maxBy[S ~[]E, E any](elems S, extract func(e E) int) int {
+       m := 0
+       for _, e := range elems {
+               m = max(m, extract(e))
+       }
+       return m
+}
+
 var (
        ErrInvalidMetadataFormatVersion = errors.New("invalid or missing 
format-version in table metadata")
        ErrInvalidMetadata              = errors.New("invalid metadata")
@@ -128,9 +662,9 @@ func ParseMetadataBytes(b []byte) (Metadata, error) {
        var ret Metadata
        switch ver.FormatVersion {
        case 1:
-               ret = &MetadataV1{}
+               ret = &metadataV1{}
        case 2:
-               ret = &MetadataV2{}
+               ret = &metadataV2{}
        default:
                return nil, ErrInvalidMetadataFormatVersion
        }
@@ -163,10 +697,28 @@ type commonMetadata struct {
        MetadataLog        []MetadataLogEntry      `json:"metadata-log"`
        SortOrderList      []SortOrder             `json:"sort-orders"`
        DefaultSortOrderID int                     
`json:"default-sort-order-id"`
-       Refs               map[string]SnapshotRef  `json:"refs"`
+       SnapshotRefs       map[string]SnapshotRef  `json:"refs"`
+}
+
+func (c *commonMetadata) Ref() SnapshotRef                     { return 
c.SnapshotRefs[MainBranch] }
+func (c *commonMetadata) Refs() iter.Seq2[string, SnapshotRef] { return 
maps.All(c.SnapshotRefs) }
+func (c *commonMetadata) SnapshotLogs() iter.Seq[SnapshotLogEntry] {
+       return slices.Values(c.SnapshotLog)
+}
+
+func (c *commonMetadata) PreviousFiles() iter.Seq[MetadataLogEntry] {
+       return slices.Values(c.MetadataLog)
 }
 
 func (c *commonMetadata) Equals(other *commonMetadata) bool {
+       if other == nil {
+               return false
+       }
+
+       if c == other {
+               return true
+       }
+
        switch {
        case c.LastPartitionID == nil && other.LastPartitionID != nil:
                fallthrough
@@ -187,7 +739,7 @@ func (c *commonMetadata) Equals(other *commonMetadata) bool 
{
                fallthrough
        case !maps.Equal(c.Props, other.Props):
                fallthrough
-       case !maps.EqualFunc(c.Refs, other.Refs, func(sr1, sr2 SnapshotRef) 
bool { return sr1.Equals(sr2) }):
+       case !maps.EqualFunc(c.SnapshotRefs, other.SnapshotRefs, func(sr1, sr2 
SnapshotRef) bool { return sr1.Equals(sr2) }):
                return false
        }
 
@@ -245,7 +797,7 @@ func (c *commonMetadata) SnapshotByID(id int64) *Snapshot {
 }
 
 func (c *commonMetadata) SnapshotByName(name string) *Snapshot {
-       if ref, ok := c.Refs[name]; ok {
+       if ref, ok := c.SnapshotRefs[name]; ok {
                return c.SnapshotByID(ref.SnapshotID)
        }
        return nil
@@ -268,6 +820,10 @@ func (c *commonMetadata) SortOrder() SortOrder {
        return UnsortedSortOrder
 }
 
+func (c *commonMetadata) DefaultSortOrder() int {
+       return c.DefaultSortOrderID
+}
+
 func (c *commonMetadata) Properties() iceberg.Properties {
        return c.Props
 }
@@ -284,8 +840,8 @@ func (c *commonMetadata) preValidate() {
        }
 
        if c.CurrentSnapshotID != nil {
-               if _, ok := c.Refs[MainBranch]; !ok {
-                       c.Refs[MainBranch] = SnapshotRef{
+               if _, ok := c.SnapshotRefs[MainBranch]; !ok {
+                       c.SnapshotRefs[MainBranch] = SnapshotRef{
                                SnapshotID:      *c.CurrentSnapshotID,
                                SnapshotRefType: BranchRef,
                        }
@@ -296,8 +852,8 @@ func (c *commonMetadata) preValidate() {
                c.MetadataLog = []MetadataLogEntry{}
        }
 
-       if c.Refs == nil {
-               c.Refs = make(map[string]SnapshotRef)
+       if c.SnapshotRefs == nil {
+               c.SnapshotRefs = make(map[string]SnapshotRef)
        }
 
        if c.SnapshotLog == nil {
@@ -370,26 +926,30 @@ func (c *commonMetadata) validate() error {
 
 func (c *commonMetadata) Version() int { return c.FormatVersion }
 
-type MetadataV1 struct {
-       Schema    iceberg.Schema           `json:"schema"`
+type metadataV1 struct {
+       Schema    *iceberg.Schema          `json:"schema"`
        Partition []iceberg.PartitionField `json:"partition-spec"`
 
        commonMetadata
 }
 
-func (m *MetadataV1) Equals(other Metadata) bool {
-       rhs, ok := other.(*MetadataV1)
+func (m *metadataV1) Equals(other Metadata) bool {
+       rhs, ok := other.(*metadataV1)
        if !ok {
                return false
        }
 
-       return m.Schema.Equals(&rhs.Schema) && slices.Equal(m.Partition, 
rhs.Partition) &&
+       if m == rhs {
+               return true
+       }
+
+       return m.Schema.Equals(rhs.Schema) && slices.Equal(m.Partition, 
rhs.Partition) &&
                m.commonMetadata.Equals(&rhs.commonMetadata)
 }
 
-func (m *MetadataV1) preValidate() {
-       if len(m.SchemaList) == 0 {
-               m.SchemaList = []*iceberg.Schema{&m.Schema}
+func (m *metadataV1) preValidate() {
+       if len(m.SchemaList) == 0 && m.Schema != nil {
+               m.SchemaList = []*iceberg.Schema{m.Schema}
        }
 
        if len(m.Specs) == 0 {
@@ -416,8 +976,8 @@ func (m *MetadataV1) preValidate() {
        m.commonMetadata.preValidate()
 }
 
-func (m *MetadataV1) UnmarshalJSON(b []byte) error {
-       type Alias MetadataV1
+func (m *metadataV1) UnmarshalJSON(b []byte) error {
+       type Alias metadataV1
        aux := (*Alias)(m)
 
        if err := json.Unmarshal(b, aux); err != nil {
@@ -428,34 +988,38 @@ func (m *MetadataV1) UnmarshalJSON(b []byte) error {
        return m.validate()
 }
 
-func (m *MetadataV1) ToV2() MetadataV2 {
+func (m *metadataV1) ToV2() metadataV2 {
        commonOut := m.commonMetadata
        commonOut.FormatVersion = 2
        if commonOut.UUID.String() == "" {
                commonOut.UUID = uuid.New()
        }
 
-       return MetadataV2{commonMetadata: commonOut}
+       return metadataV2{commonMetadata: commonOut}
 }
 
-type MetadataV2 struct {
-       LastSequenceNumber int `json:"last-sequence-number"`
+type metadataV2 struct {
+       LastSequenceNumber int64 `json:"last-sequence-number"`
 
        commonMetadata
 }
 
-func (m *MetadataV2) Equals(other Metadata) bool {
-       rhs, ok := other.(*MetadataV2)
+func (m *metadataV2) Equals(other Metadata) bool {
+       rhs, ok := other.(*metadataV2)
        if !ok {
                return false
        }
 
+       if m == rhs {
+               return true
+       }
+
        return m.LastSequenceNumber == rhs.LastSequenceNumber &&
                m.commonMetadata.Equals(&rhs.commonMetadata)
 }
 
-func (m *MetadataV2) UnmarshalJSON(b []byte) error {
-       type Alias MetadataV2
+func (m *metadataV2) UnmarshalJSON(b []byte) error {
+       type Alias metadataV2
        aux := (*Alias)(m)
 
        if err := json.Unmarshal(b, aux); err != nil {
diff --git a/table/metadata_test.go b/table/metadata_internal_test.go
similarity index 93%
rename from table/metadata_test.go
rename to table/metadata_internal_test.go
index e268d88..a02ac7f 100644
--- a/table/metadata_test.go
+++ b/table/metadata_internal_test.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package table_test
+package table
 
 import (
        "encoding/json"
@@ -23,7 +23,6 @@ import (
        "testing"
 
        "github.com/apache/iceberg-go"
-       "github.com/apache/iceberg-go/table"
        "github.com/google/uuid"
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
@@ -112,14 +111,14 @@ const ExampleTableMetadataV1 = `{
 }`
 
 func TestMetadataV1Parsing(t *testing.T) {
-       meta, err := table.ParseMetadataBytes([]byte(ExampleTableMetadataV1))
+       meta, err := ParseMetadataBytes([]byte(ExampleTableMetadataV1))
        require.NoError(t, err)
        require.NotNil(t, meta)
 
-       assert.IsType(t, (*table.MetadataV1)(nil), meta)
+       assert.IsType(t, (*metadataV1)(nil), meta)
        assert.Equal(t, 1, meta.Version())
 
-       data := meta.(*table.MetadataV1)
+       data := meta.(*metadataV1)
        assert.Equal(t, uuid.MustParse("d20125c8-7284-442c-9aea-15fee620737c"), 
meta.TableUUID())
        assert.Equal(t, "s3://bucket/test/location", meta.Location())
        assert.Equal(t, int64(1602638573874), meta.LastUpdatedMillis())
@@ -156,21 +155,21 @@ func TestMetadataV1Parsing(t *testing.T) {
        assert.Nil(t, meta.SnapshotByID(0))
        assert.Nil(t, meta.SnapshotByName("foo"))
        assert.Zero(t, data.DefaultSortOrderID)
-       assert.Equal(t, table.UnsortedSortOrder, meta.SortOrder())
+       assert.Equal(t, UnsortedSortOrder, meta.SortOrder())
 }
 
 func TestMetadataV2Parsing(t *testing.T) {
-       meta, err := table.ParseMetadataBytes([]byte(ExampleTableMetadataV2))
+       meta, err := ParseMetadataBytes([]byte(ExampleTableMetadataV2))
        require.NoError(t, err)
        require.NotNil(t, meta)
 
-       assert.IsType(t, (*table.MetadataV2)(nil), meta)
+       assert.IsType(t, (*metadataV2)(nil), meta)
        assert.Equal(t, 2, meta.Version())
 
-       data := meta.(*table.MetadataV2)
+       data := meta.(*metadataV2)
        assert.Equal(t, uuid.MustParse("9c12d441-03fe-4693-9a96-a0705ddf69c1"), 
data.UUID)
        assert.Equal(t, "s3://bucket/test/location", data.Location())
-       assert.Equal(t, 34, data.LastSequenceNumber)
+       assert.Equal(t, int64(34), data.LastSequenceNumber)
        assert.Equal(t, int64(1602638573590), data.LastUpdatedMS)
        assert.Equal(t, 3, data.LastColumnId)
        assert.Equal(t, 0, data.SchemaList[0].ID)
@@ -192,7 +191,7 @@ func TestMetadataV2Parsing(t *testing.T) {
 }
 
 func TestParsingCorrectTypes(t *testing.T) {
-       var meta table.MetadataV2
+       var meta metadataV2
        require.NoError(t, json.Unmarshal([]byte(ExampleTableMetadataV2), 
&meta))
 
        assert.IsType(t, &iceberg.Schema{}, meta.SchemaList[0])
@@ -201,7 +200,7 @@ func TestParsingCorrectTypes(t *testing.T) {
 }
 
 func TestSerializeMetadataV1(t *testing.T) {
-       var meta table.MetadataV1
+       var meta metadataV1
        require.NoError(t, json.Unmarshal([]byte(ExampleTableMetadataV1), 
&meta))
 
        data, err := json.Marshal(&meta)
@@ -212,7 +211,7 @@ func TestSerializeMetadataV1(t *testing.T) {
 }
 
 func TestSerializeMetadataV2(t *testing.T) {
-       var meta table.MetadataV2
+       var meta metadataV2
        require.NoError(t, json.Unmarshal([]byte(ExampleTableMetadataV2), 
&meta))
 
        data, err := json.Marshal(&meta)
@@ -243,9 +242,9 @@ func TestInvalidFormatVersion(t *testing.T) {
         "snapshots": []
     }`
 
-       _, err := table.ParseMetadataBytes([]byte(metadataInvalidFormat))
+       _, err := ParseMetadataBytes([]byte(metadataInvalidFormat))
        assert.Error(t, err)
-       assert.ErrorIs(t, err, table.ErrInvalidMetadataFormatVersion)
+       assert.ErrorIs(t, err, ErrInvalidMetadataFormatVersion)
 }
 
 func TestCurrentSchemaNotFound(t *testing.T) {
@@ -278,9 +277,9 @@ func TestCurrentSchemaNotFound(t *testing.T) {
         "snapshots": []
     }`
 
-       _, err := table.ParseMetadataBytes([]byte(schemaNotFound))
+       _, err := ParseMetadataBytes([]byte(schemaNotFound))
        assert.Error(t, err)
-       assert.ErrorIs(t, err, table.ErrInvalidMetadata)
+       assert.ErrorIs(t, err, ErrInvalidMetadata)
        assert.ErrorContains(t, err, "current-schema-id 2 can't be found in any 
schema")
 }
 
@@ -322,9 +321,9 @@ func TestSortOrderNotFound(t *testing.T) {
         "snapshots": []
     }`
 
-       _, err := table.ParseMetadataBytes([]byte(metadataSortOrderNotFound))
+       _, err := ParseMetadataBytes([]byte(metadataSortOrderNotFound))
        assert.Error(t, err)
-       assert.ErrorIs(t, err, table.ErrInvalidMetadata)
+       assert.ErrorIs(t, err, ErrInvalidMetadata)
        assert.ErrorContains(t, err, "default-sort-order-id 4 can't be found in 
[3: [\n2 asc nulls-first\nbucket[4](3) desc nulls-last\n]]")
 }
 
@@ -358,10 +357,10 @@ func TestSortOrderUnsorted(t *testing.T) {
         "snapshots": []
     }`
 
-       var meta table.MetadataV2
+       var meta metadataV2
        require.NoError(t, json.Unmarshal([]byte(sortOrderUnsorted), &meta))
 
-       assert.Equal(t, table.UnsortedSortOrderID, meta.DefaultSortOrderID)
+       assert.Equal(t, UnsortedSortOrderID, meta.DefaultSortOrderID)
        assert.Len(t, meta.SortOrderList, 0)
 }
 
@@ -394,28 +393,28 @@ func TestInvalidPartitionSpecID(t *testing.T) {
         "last-partition-id": 1000
     }`
 
-       var meta table.MetadataV2
+       var meta metadataV2
        err := json.Unmarshal([]byte(invalidSpecID), &meta)
-       assert.ErrorIs(t, err, table.ErrInvalidMetadata)
+       assert.ErrorIs(t, err, ErrInvalidMetadata)
        assert.ErrorContains(t, err, "default-spec-id 1 can't be found")
 }
 
 func TestV2RefCreation(t *testing.T) {
-       var meta table.MetadataV2
+       var meta metadataV2
        require.NoError(t, json.Unmarshal([]byte(ExampleTableMetadataV2), 
&meta))
 
        maxRefAge := int64(10000000)
-       assert.Equal(t, map[string]table.SnapshotRef{
+       assert.Equal(t, map[string]SnapshotRef{
                "main": {
                        SnapshotID:      3055729675574597004,
-                       SnapshotRefType: table.BranchRef,
+                       SnapshotRefType: BranchRef,
                },
                "test": {
                        SnapshotID:      3051729675574597004,
-                       SnapshotRefType: table.TagRef,
+                       SnapshotRefType: TagRef,
                        MaxRefAgeMs:     &maxRefAge,
                },
-       }, meta.Refs)
+       }, meta.SnapshotRefs)
 }
 
 func TestV1WriteMetadataToV2(t *testing.T) {
@@ -453,11 +452,11 @@ func TestV1WriteMetadataToV2(t *testing.T) {
                "snapshots": [{"snapshot-id": 1925, "timestamp-ms": 
1602638573822}]
        }`
 
-       meta, err := table.ParseMetadataString(minimalV1Example)
+       meta, err := ParseMetadataString(minimalV1Example)
        require.NoError(t, err)
-       assert.IsType(t, (*table.MetadataV1)(nil), meta)
+       assert.IsType(t, (*metadataV1)(nil), meta)
 
-       metaV2 := meta.(*table.MetadataV1).ToV2()
+       metaV2 := meta.(*metadataV1).ToV2()
        metaV2Json, err := json.Marshal(metaV2)
        require.NoError(t, err)
 
diff --git a/table/requirements.go b/table/requirements.go
new file mode 100644
index 0000000..5a4b883
--- /dev/null
+++ b/table/requirements.go
@@ -0,0 +1,267 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "fmt"
+
+       "github.com/google/uuid"
+)
+
+const (
+       reqAssertCreate                  = "assert-create"
+       reqAssertTableUUID               = "assert-table-uuid"
+       reqAssertRefSnapshotID           = "assert-ref-snapshot-id"
+       reqAssertDefaultSpecID           = "assert-default-spec-id"
+       reqAssertCurrentSchemaID         = "assert-current-schema-id"
+       reqAssertDefaultSortOrderID      = "assert-default-sort-order-id"
+       reqAssertLastAssignedFieldID     = "assert-last-assigned-field-id"
+       reqAssertLastAssignedPartitionID = "assert-last-assigned-partition-id"
+)
+
+// A Requirement is a validation rule that must be satisfied before attempting 
to
+// make and commit changes to a table. Requirements are used to ensure that the
+// table is in a valid state before making changes.
+type Requirement interface {
+       // Validate checks that the current table metadata satisfies the 
requirement.
+       Validate(Metadata) error
+}
+
+// baseRequirement is a common struct that all requirements embed. It is used 
to
+// identify the type of the requirement.
+type baseRequirement struct {
+       Type string `json:"type"`
+}
+
+type assertCreate struct {
+       baseRequirement
+}
+
+// AssertCreate creates a requirement that the table does not already exist.
+func AssertCreate() Requirement {
+       return &assertCreate{
+               baseRequirement: baseRequirement{Type: reqAssertCreate},
+       }
+}
+
+func (a *assertCreate) Validate(meta Metadata) error {
+       if meta != nil {
+               return fmt.Errorf("Table already exists")
+       }
+
+       return nil
+}
+
+type assertTableUuid struct {
+       baseRequirement
+       UUID uuid.UUID `json:"uuid"`
+}
+
+// AssertTableUUID creates a requirement that the table UUID matches the given 
UUID.
+func AssertTableUUID(uuid uuid.UUID) Requirement {
+       return &assertTableUuid{
+               baseRequirement: baseRequirement{Type: reqAssertTableUUID},
+               UUID:            uuid,
+       }
+}
+
+func (a *assertTableUuid) Validate(meta Metadata) error {
+       if meta == nil {
+               return fmt.Errorf("requirement failed: current table metadata 
does not exist")
+       }
+
+       if meta.TableUUID() != a.UUID {
+               return fmt.Errorf("UUID mismatch: %s != %s", meta.TableUUID(), 
a.UUID)
+       }
+
+       return nil
+}
+
+type assertRefSnapshotID struct {
+       baseRequirement
+       Ref        string `json:"ref"`
+       SnapshotID *int64 `json:"snapshot-id"`
+}
+
+// AssertRefSnapshotID creates a requirement which ensures that the table 
branch
+// or tag identified by the given ref must reference the given snapshot id.
+// If the id is nil, the ref must not already exist.
+func AssertRefSnapshotID(ref string, id *int64) Requirement {
+       return &assertRefSnapshotID{
+               baseRequirement: baseRequirement{Type: reqAssertRefSnapshotID},
+               Ref:             ref,
+               SnapshotID:      id,
+       }
+}
+
+func (a *assertRefSnapshotID) Validate(meta Metadata) error {
+       if meta == nil {
+               return fmt.Errorf("requirement failed: current table metadata 
does not exist")
+       }
+
+       var r *SnapshotRef
+       for name, ref := range meta.Refs() {
+               if name == a.Ref {
+                       r = &ref
+                       break
+               }
+       }
+       if r == nil {
+               return fmt.Errorf("requirement failed: branch or tag %s is 
missing, expected %d", a.Ref, a.SnapshotID)
+       }
+
+       if a.SnapshotID == nil {
+               return fmt.Errorf("requirement failed: %s %s was created 
concurrently", r.SnapshotRefType, a.Ref)
+       }
+
+       if r.SnapshotID != *a.SnapshotID {
+               return fmt.Errorf("requirement failed: %s %s has changed: 
expected id %d, found %d", r.SnapshotRefType, a.Ref, a.SnapshotID, r.SnapshotID)
+       }
+
+       return nil
+}
+
+type assertLastAssignedFieldId struct {
+       baseRequirement
+       LastAssignedFieldID int `json:"last-assigned-field-id"`
+}
+
+// AssertLastAssignedFieldID validates that the table's last assigned column ID
+// matches the given id.
+func AssertLastAssignedFieldID(id int) Requirement {
+       return &assertLastAssignedFieldId{
+               baseRequirement:     baseRequirement{Type: 
reqAssertLastAssignedFieldID},
+               LastAssignedFieldID: id,
+       }
+}
+
+func (a *assertLastAssignedFieldId) Validate(meta Metadata) error {
+       if meta == nil {
+               return fmt.Errorf("requirement failed: current table metadata 
does not exist")
+       }
+
+       if meta.LastColumnID() != a.LastAssignedFieldID {
+               return fmt.Errorf("requirement failed: last assigned field id 
has changed: expected %d, found %d", a.LastAssignedFieldID, meta.LastColumnID())
+       }
+
+       return nil
+}
+
+type assertCurrentSchemaId struct {
+       baseRequirement
+       CurrentSchemaID int `json:"current-schema-id"`
+}
+
+// AssertCurrentSchemaId creates a requirement that the table's current schema 
ID
+// matches the given id.
+func AssertCurrentSchemaID(id int) Requirement {
+       return &assertCurrentSchemaId{
+               baseRequirement: baseRequirement{Type: 
reqAssertCurrentSchemaID},
+               CurrentSchemaID: id,
+       }
+}
+
+func (a *assertCurrentSchemaId) Validate(meta Metadata) error {
+       if meta == nil {
+               return fmt.Errorf("requirement failed: current table metadata 
does not exist")
+       }
+
+       if meta.CurrentSchema().ID != a.CurrentSchemaID {
+               return fmt.Errorf("requirement failed: current schema id has 
changed: expected %d, found %d", a.CurrentSchemaID, meta.CurrentSchema().ID)
+       }
+
+       return nil
+}
+
+type assertLastAssignedPartitionId struct {
+       baseRequirement
+       LastAssignedPartitionID int `json:"last-assigned-partition-id"`
+}
+
+// AssertLastAssignedPartitionID creates a requriement that the table's last 
assigned partition ID
+// matches the given id.
+func AssertLastAssignedPartitionID(id int) Requirement {
+       return &assertLastAssignedPartitionId{
+               baseRequirement:         baseRequirement{Type: 
reqAssertLastAssignedPartitionID},
+               LastAssignedPartitionID: id,
+       }
+}
+
+func (a *assertLastAssignedPartitionId) Validate(meta Metadata) error {
+       if meta == nil {
+               return fmt.Errorf("requirement failed: current table metadata 
does not exist")
+       }
+
+       if *meta.LastPartitionSpecID() != a.LastAssignedPartitionID {
+               return fmt.Errorf("requirement failed: last assigned partition 
id has changed: expected %d, found %d", a.LastAssignedPartitionID, 
*meta.LastPartitionSpecID())
+       }
+
+       return nil
+}
+
+type assertDefaultSpecId struct {
+       baseRequirement
+       DefaultSpecID int `json:"default-spec-id"`
+}
+
+// AssertDefaultSpecID creates a requirement that the table's default 
partition spec ID
+// matches the given id.
+func AssertDefaultSpecID(id int) Requirement {
+       return &assertDefaultSpecId{
+               baseRequirement: baseRequirement{Type: reqAssertDefaultSpecID},
+               DefaultSpecID:   id,
+       }
+}
+
+func (a *assertDefaultSpecId) Validate(meta Metadata) error {
+       if meta == nil {
+               return fmt.Errorf("requirement failed: current table metadata 
does not exist")
+       }
+
+       if meta.DefaultPartitionSpec() != a.DefaultSpecID {
+               return fmt.Errorf("requirement failed: default spec id has 
changed: expected %d, found %d", a.DefaultSpecID, meta.DefaultPartitionSpec())
+       }
+
+       return nil
+}
+
+type assertDefaultSortOrderId struct {
+       baseRequirement
+       DefaultSortOrderID int `json:"default-sort-order-id"`
+}
+
+// AssertDefaultSortOrderID creates a requirement that the table's default 
sort order ID
+// matches the given id.
+func AssertDefaultSortOrderID(id int) Requirement {
+       return &assertDefaultSortOrderId{
+               baseRequirement:    baseRequirement{Type: 
reqAssertDefaultSortOrderID},
+               DefaultSortOrderID: id,
+       }
+}
+
+func (a *assertDefaultSortOrderId) Validate(meta Metadata) error {
+       if meta == nil {
+               return fmt.Errorf("requirement failed: current table metadata 
does not exist")
+       }
+
+       if meta.DefaultSortOrder() != a.DefaultSortOrderID {
+               return fmt.Errorf("requirement failed: default sort order id 
has changed: expected %d, found %d", a.DefaultSortOrderID, 
meta.DefaultSortOrder())
+       }
+
+       return nil
+}
diff --git a/table/table_test.go b/table/table_test.go
index cde94ab..f09054c 100644
--- a/table/table_test.go
+++ b/table/table_test.go
@@ -41,7 +41,7 @@ func (t *TableTestSuite) SetupSuite() {
        var mockfs internal.MockFS
        mockfs.Test(t.T())
        mockfs.On("Open", "s3://bucket/test/location/uuid.metadata.json").
-               Return(&internal.MockFile{Contents: 
bytes.NewReader([]byte(ExampleTableMetadataV2))}, nil)
+               Return(&internal.MockFile{Contents: 
bytes.NewReader([]byte(table.ExampleTableMetadataV2))}, nil)
        defer mockfs.AssertExpectations(t.T())
 
        tbl, err := table.NewFromLocation([]string{"foo"}, 
"s3://bucket/test/location/uuid.metadata.json", &mockfs)
@@ -59,7 +59,7 @@ func (t *TableTestSuite) TestNewTableFromReadFile() {
        var mockfsReadFile internal.MockFSReadFile
        mockfsReadFile.Test(t.T())
        mockfsReadFile.On("ReadFile", 
"s3://bucket/test/location/uuid.metadata.json").
-               Return([]byte(ExampleTableMetadataV2), nil)
+               Return([]byte(table.ExampleTableMetadataV2), nil)
        defer mockfsReadFile.AssertExpectations(t.T())
 
        tbl2, err := table.NewFromLocation([]string{"foo"}, 
"s3://bucket/test/location/uuid.metadata.json", &mockfsReadFile)
diff --git a/table/updates.go b/table/updates.go
new file mode 100644
index 0000000..7b09a0f
--- /dev/null
+++ b/table/updates.go
@@ -0,0 +1,379 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "fmt"
+
+       "github.com/apache/iceberg-go"
+       "github.com/google/uuid"
+)
+
+const (
+       updateSpec             = "add-spec"
+       updateAddSchema        = "add-schema"
+       updateSortOrder        = "add-sort-order"
+       updateAssignUUID       = "assign-uuid"
+       updateDefaultSpec      = "set-default-spec"
+       updateCurrentSchema    = "set-current-schema"
+       updateDefaultSortOrder = "set-default-sort-order"
+       updateUpgradeFormat    = "upgrade-format-version"
+)
+
+// Update represents a change to a table's metadata.
+type Update interface {
+       // Action returns the name of the action that the update represents.
+       Action() string
+       // Apply applies the update to the given metadata builder.
+       Apply(*MetadataBuilder) error
+}
+
+// baseUpdate contains the common fields for all updates. It is used to 
identify the type
+// of the update.
+type baseUpdate struct {
+       ActionName string `json:"action"`
+}
+
+func (u *baseUpdate) Action() string {
+       return u.ActionName
+}
+
+type assignUUIDUpdate struct {
+       baseUpdate
+       UUID uuid.UUID `json:"uuid"`
+}
+
+// NewAssignUUIDUpdate creates a new update to assign a UUID to the table 
metadata.
+func NewAssignUUIDUpdate(uuid uuid.UUID) Update {
+       return &assignUUIDUpdate{
+               baseUpdate: baseUpdate{ActionName: updateAssignUUID},
+               UUID:       uuid,
+       }
+}
+
+func (u *assignUUIDUpdate) Apply(builder *MetadataBuilder) error {
+       _, err := builder.SetUUID(u.UUID)
+       return err
+}
+
+type upgradeFormatVersionUpdate struct {
+       baseUpdate
+       FormatVersion int `json:"format-version"`
+}
+
+// NewUpgradeFormatVersionUpdate creates a new update that upgrades the format 
version
+// of the table metadata to the given formatVersion.
+func NewUpgradeFormatVersionUpdate(formatVersion int) Update {
+       return &upgradeFormatVersionUpdate{
+               baseUpdate:    baseUpdate{ActionName: updateUpgradeFormat},
+               FormatVersion: formatVersion,
+       }
+}
+
+func (u *upgradeFormatVersionUpdate) Apply(builder *MetadataBuilder) error {
+       _, err := builder.SetFormatVersion(u.FormatVersion)
+       return err
+}
+
+type addSchemaUpdate struct {
+       baseUpdate
+       Schema       *iceberg.Schema `json:"schema"`
+       LastColumnID int             `json:"last-column-id"`
+       initial      bool
+}
+
+// NewAddSchemaUpdate creates a new update that adds the given schema and last 
column ID to
+// the table metadata. If the initial flag is set to true, the schema is 
considered the initial
+// schema of the table, and all previously added schemas in the metadata 
builder are removed.
+func NewAddSchemaUpdate(schema *iceberg.Schema, lastColumnID int, initial 
bool) Update {
+       return &addSchemaUpdate{
+               baseUpdate:   baseUpdate{ActionName: updateAddSchema},
+               Schema:       schema,
+               LastColumnID: lastColumnID,
+               initial:      initial,
+       }
+}
+
+func (u *addSchemaUpdate) Apply(builder *MetadataBuilder) error {
+       _, err := builder.AddSchema(u.Schema, u.LastColumnID, u.initial)
+       return err
+}
+
+type setCurrentSchemaUpdate struct {
+       baseUpdate
+       SchemaID int `json:"schema-id"`
+}
+
+// NewSetCurrentSchemaUpdate creates a new update that sets the current schema 
of the table
+// metadata to the given schema ID.
+func NewSetCurrentSchemaUpdate(id int) Update {
+       return &setCurrentSchemaUpdate{
+               baseUpdate: baseUpdate{ActionName: updateCurrentSchema},
+               SchemaID:   id,
+       }
+}
+
+func (u *setCurrentSchemaUpdate) Apply(builder *MetadataBuilder) error {
+       _, err := builder.SetCurrentSchemaID(u.SchemaID)
+       return err
+}
+
+type addPartitionSpecUpdate struct {
+       baseUpdate
+       Spec    *iceberg.PartitionSpec `json:"spec"`
+       initial bool
+}
+
+// NewAddPartitionSpecUpdate creates a new update that adds the given 
partition spec to the table
+// metadata. If the initial flag is set to true, the spec is considered the 
initial spec of the table,
+// and all other previously added specs in the metadata builder are removed.
+func NewAddPartitionSpecUpdate(spec *iceberg.PartitionSpec, initial bool) 
Update {
+       return &addPartitionSpecUpdate{
+               baseUpdate: baseUpdate{ActionName: updateSpec},
+               Spec:       spec,
+               initial:    initial,
+       }
+}
+
+func (u *addPartitionSpecUpdate) Apply(builder *MetadataBuilder) error {
+       _, err := builder.AddPartitionSpec(u.Spec, u.initial)
+       return err
+}
+
+type setDefaultSpecUpdate struct {
+       baseUpdate
+       SpecID int `json:"spec-id"`
+}
+
+// NewSetDefaultSpecUpdate creates a new update that sets the default 
partition spec of the
+// table metadata to the given spec ID.
+func NewSetDefaultSpecUpdate(id int) Update {
+       return &setDefaultSpecUpdate{
+               baseUpdate: baseUpdate{ActionName: updateDefaultSpec},
+               SpecID:     id,
+       }
+}
+
+func (u *setDefaultSpecUpdate) Apply(builder *MetadataBuilder) error {
+       _, err := builder.SetDefaultSpecID(u.SpecID)
+       return err
+}
+
+type addSortOrderUpdate struct {
+       baseUpdate
+       SortOrder *SortOrder `json:"sort-order"`
+       initial   bool
+}
+
+// NewAddSortOrderUpdate creates a new update that adds the given sort order 
to the table metadata.
+// If the initial flag is set to true, the sort order is considered the 
initial sort order of the table,
+// and all previously added sort orders in the metadata builder are removed.
+func NewAddSortOrderUpdate(sortOrder *SortOrder, initial bool) Update {
+       return &addSortOrderUpdate{
+               baseUpdate: baseUpdate{ActionName: updateSortOrder},
+               SortOrder:  sortOrder,
+               initial:    initial,
+       }
+}
+
+func (u *addSortOrderUpdate) Apply(builder *MetadataBuilder) error {
+       _, err := builder.AddSortOrder(u.SortOrder, u.initial)
+       return err
+}
+
+type setDefaultSortOrderUpdate struct {
+       baseUpdate
+       SortOrderID int `json:"sort-order-id"`
+}
+
+// NewSetDefaultSortOrderUpdate creates a new update that sets the default 
sort order of the table metadata
+// to the given sort order ID.
+func NewSetDefaultSortOrderUpdate(id int) Update {
+       return &setDefaultSortOrderUpdate{
+               baseUpdate:  baseUpdate{ActionName: updateDefaultSortOrder},
+               SortOrderID: id,
+       }
+}
+
+func (u *setDefaultSortOrderUpdate) Apply(builder *MetadataBuilder) error {
+       _, err := builder.SetDefaultSortOrderID(u.SortOrderID)
+       return err
+}
+
+type addSnapshotUpdate struct {
+       baseUpdate
+       Snapshot *Snapshot `json:"snapshot"`
+}
+
+// NewAddSnapshotUpdate creates a new update that adds the given snapshot to 
the table metadata.
+func NewAddSnapshotUpdate(snapshot *Snapshot) Update {
+       return &addSnapshotUpdate{
+               baseUpdate: baseUpdate{ActionName: "add-snapshot"},
+               Snapshot:   snapshot,
+       }
+}
+
+func (u *addSnapshotUpdate) Apply(builder *MetadataBuilder) error {
+       _, err := builder.AddSnapshot(u.Snapshot)
+       return err
+}
+
+type setSnapshotRefUpdate struct {
+       baseUpdate
+       RefName            string  `json:"ref-name"`
+       RefType            RefType `json:"type"`
+       SnapshotID         int64   `json:"snapshot-id"`
+       MaxRefAgeMs        int64   `json:"max-ref-age-ms,omitempty"`
+       MaxSnapshotAgeMs   int64   `json:"max-snapshot-age-ms,omitempty"`
+       MinSnapshotsToKeep int     `json:"min-snapshots-to-keep,omitempty"`
+}
+
+// NewSetSnapshotRefUpdate creates a new update that sets the given snapshot 
reference
+// as the current snapshot of the table metadata. MaxRefAgeMs, 
MaxSnapshotAgeMs,
+// and MinSnapshotsToKeep are optional, and any non-positive values are 
ignored.
+func NewSetSnapshotRefUpdate(
+       name string,
+       snapshotID int64,
+       refType RefType,
+       maxRefAgeMs, maxSnapshotAgeMs int64,
+       minSnapshotsToKeep int,
+) Update {
+       return &setSnapshotRefUpdate{
+               baseUpdate:         baseUpdate{ActionName: "set-snapshot-ref"},
+               RefName:            name,
+               RefType:            refType,
+               SnapshotID:         snapshotID,
+               MaxRefAgeMs:        maxRefAgeMs,
+               MaxSnapshotAgeMs:   maxSnapshotAgeMs,
+               MinSnapshotsToKeep: minSnapshotsToKeep,
+       }
+}
+
+func (u *setSnapshotRefUpdate) Apply(builder *MetadataBuilder) error {
+       opts := []setSnapshotRefOption{}
+       if u.MaxRefAgeMs >= 0 {
+               opts = append(opts, WithMaxRefAgeMs(u.MaxRefAgeMs))
+       }
+       if u.MaxSnapshotAgeMs >= 0 {
+               opts = append(opts, WithMaxSnapshotAgeMs(u.MaxSnapshotAgeMs))
+       }
+       if u.MinSnapshotsToKeep >= 0 {
+               opts = append(opts, 
WithMinSnapshotsToKeep(u.MinSnapshotsToKeep))
+       }
+
+       _, err := builder.SetSnapshotRef(
+               u.RefName,
+               u.SnapshotID,
+               u.RefType,
+               opts...,
+       )
+       return err
+}
+
+type setLocationUpdate struct {
+       baseUpdate
+       Location string `json:"location"`
+}
+
+// NewSetLocationUpdate creates a new update that sets the location of the 
table metadata.
+func NewSetLocationUpdate(loc string) Update {
+       return &setLocationUpdate{
+               baseUpdate: baseUpdate{ActionName: "set-location"},
+               Location:   loc,
+       }
+}
+
+func (u *setLocationUpdate) Apply(builder *MetadataBuilder) error {
+       _, err := builder.SetLoc(u.Location)
+       return err
+}
+
+type setPropertiesUpdate struct {
+       baseUpdate
+       Updates iceberg.Properties `json:"updates"`
+}
+
+// NewSetPropertiesUpdate creates a new update that sets the given properties 
in the
+// table metadata.
+func NewSetPropertiesUpdate(updates iceberg.Properties) *setPropertiesUpdate {
+       return &setPropertiesUpdate{
+               baseUpdate: baseUpdate{ActionName: "set-properties"},
+               Updates:    updates,
+       }
+}
+
+func (u *setPropertiesUpdate) Apply(builder *MetadataBuilder) error {
+       _, err := builder.SetProperties(u.Updates)
+       return err
+}
+
+type removePropertiesUpdate struct {
+       baseUpdate
+       Removals []string `json:"removals"`
+}
+
+// NewRemovePropertiesUpdate creates a new update that removes properties from 
the table metadata.
+// The properties are identified by their names, and if a property with the 
given name does not exist,
+// it is ignored.
+func NewRemovePropertiesUpdate(removals []string) Update {
+       return &removePropertiesUpdate{
+               baseUpdate: baseUpdate{ActionName: "remove-properties"},
+               Removals:   removals,
+       }
+}
+
+func (u *removePropertiesUpdate) Apply(builder *MetadataBuilder) error {
+       _, err := builder.RemoveProperties(u.Removals)
+       return err
+}
+
+type removeSnapshotsUpdate struct {
+       baseUpdate
+       SnapshotIDs []int64 `json:"snapshot-ids"`
+}
+
+// NewRemoveSnapshotsUpdate creates a new update that removes all snapshots 
from
+// the table metadata with the given snapshot IDs.
+func NewRemoveSnapshotsUpdate(ids []int64) Update {
+       return &removeSnapshotsUpdate{
+               baseUpdate:  baseUpdate{ActionName: "remove-snapshots"},
+               SnapshotIDs: ids,
+       }
+}
+
+func (u *removeSnapshotsUpdate) Apply(builder *MetadataBuilder) error {
+       return fmt.Errorf("%w: remove-snapshots", iceberg.ErrNotImplemented)
+}
+
+type removeSnapshotRefUpdate struct {
+       baseUpdate
+       RefName string `json:"ref-name"`
+}
+
+// NewRemoveSnapshotRefUpdate creates a new update that removes a snapshot 
reference
+// from the table metadata.
+func NewRemoveSnapshotRefUpdate(ref string) *removeSnapshotRefUpdate {
+       return &removeSnapshotRefUpdate{
+               baseUpdate: baseUpdate{ActionName: "remove-snapshot-ref"},
+               RefName:    ref,
+       }
+}
+
+func (u *removeSnapshotRefUpdate) Apply(builder *MetadataBuilder) error {
+       return fmt.Errorf("%w: remove-snapshot-ref", iceberg.ErrNotImplemented)
+}

Reply via email to