This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 9c5f3e4 feat: Impl rest catalog + table updates & requirements
9c5f3e4 is described below
commit 9c5f3e4011dbba41d4f147a0a7fd656c77007d23
Author: jwtryg <[email protected]>
AuthorDate: Tue Jan 7 22:39:57 2025 +0100
feat: Impl rest catalog + table updates & requirements
Implement remaining REST catalog functions and basic framework for table
updates/requirements.
---
catalog/catalog.go | 3 +-
catalog/rest.go | 265 +++++++--
catalog/rest_test.go | 100 ++++
go.mod | 3 +-
go.sum | 8 +-
partitions.go | 6 +
table/metadata.go | 618 ++++++++++++++++++++-
...{metadata_test.go => metadata_internal_test.go} | 61 +-
table/requirements.go | 267 +++++++++
table/table_test.go | 4 +-
table/updates.go | 379 +++++++++++++
11 files changed, 1615 insertions(+), 99 deletions(-)
diff --git a/catalog/catalog.go b/catalog/catalog.go
index 65da7e5..9b5776d 100644
--- a/catalog/catalog.go
+++ b/catalog/catalog.go
@@ -45,6 +45,7 @@ var (
ErrNoSuchTable = errors.New("table does not exist")
ErrNoSuchNamespace = errors.New("namespace does not exist")
ErrNamespaceAlreadyExists = errors.New("namespace already exists")
+ ErrTableAlreadyExists = errors.New("table already exists")
)
// WithAwsConfig sets the AWS configuration for the catalog.
@@ -156,7 +157,7 @@ type Catalog interface {
ListTables(ctx context.Context, namespace table.Identifier)
([]table.Identifier, error)
// LoadTable loads a table from the catalog and returns a Table with
the metadata.
LoadTable(ctx context.Context, identifier table.Identifier, props
iceberg.Properties) (*table.Table, error)
- // DropTable tells the catalog to drop the table entirely
+ // DropTable tells the catalog to drop the table entirely.
DropTable(ctx context.Context, identifier table.Identifier) error
// RenameTable tells the catalog to rename a given table by the
identifiers
// provided, and then loads and returns the destination table
diff --git a/catalog/rest.go b/catalog/rest.go
index ef9c332..47be1eb 100644
--- a/catalog/rest.go
+++ b/catalog/rest.go
@@ -84,6 +84,86 @@ func (e errorResponse) Error() string {
return e.Type + ": " + e.Message
}
+type identifier struct {
+ Namespace []string `json:"namespace"`
+ Name string `json:"name"`
+}
+
+type commitTableResponse struct {
+ MetadataLoc string `json:"metadata-location"`
+ RawMetadata json.RawMessage `json:"metadata"`
+ Metadata table.Metadata `json:"-"`
+}
+
+func (t *commitTableResponse) UnmarshalJSON(b []byte) (err error) {
+ type Alias commitTableResponse
+ if err = json.Unmarshal(b, (*Alias)(t)); err != nil {
+ return err
+ }
+
+ t.Metadata, err = table.ParseMetadataBytes(t.RawMetadata)
+ return
+}
+
+type loadTableResponse struct {
+ MetadataLoc string `json:"metadata-location"`
+ RawMetadata json.RawMessage `json:"metadata"`
+ Config iceberg.Properties `json:"config"`
+ Metadata table.Metadata `json:"-"`
+}
+
+func (t *loadTableResponse) UnmarshalJSON(b []byte) (err error) {
+ type Alias loadTableResponse
+ if err = json.Unmarshal(b, (*Alias)(t)); err != nil {
+ return err
+ }
+
+ t.Metadata, err = table.ParseMetadataBytes(t.RawMetadata)
+ return
+}
+
+type createTableOption func(*createTableRequest)
+
+func WithLocation(loc string) createTableOption {
+ return func(req *createTableRequest) {
+ req.Location = strings.TrimRight(loc, "/")
+ }
+}
+
+func WithPartitionSpec(spec *iceberg.PartitionSpec) createTableOption {
+ return func(req *createTableRequest) {
+ req.PartitionSpec = spec
+ }
+}
+
+func WithWriteOrder(order *table.SortOrder) createTableOption {
+ return func(req *createTableRequest) {
+ req.WriteOrder = order
+ }
+}
+
+func WithStageCreate() createTableOption {
+ return func(req *createTableRequest) {
+ req.StageCreate = true
+ }
+}
+
+func WithProperties(props iceberg.Properties) createTableOption {
+ return func(req *createTableRequest) {
+ req.Props = props
+ }
+}
+
+type createTableRequest struct {
+ Name string `json:"name"`
+ Schema *iceberg.Schema `json:"schema"`
+ Location string `json:"location,omitempty"`
+ PartitionSpec *iceberg.PartitionSpec `json:"partition-spec,omitempty"`
+ WriteOrder *table.SortOrder `json:"write-order,omitempty"`
+ StageCreate bool `json:"stage-create,omitempty"`
+ Props iceberg.Properties `json:"properties,omitempty"`
+}
+
type oauthTokenResponse struct {
AccessToken string `json:"access_token"`
TokenType string `json:"token_type"`
@@ -537,6 +617,20 @@ func checkValidNamespace(ident table.Identifier) error {
return nil
}
+func (r *RestCatalog) tableFromResponse(identifier []string, metadata
table.Metadata, loc string, config iceberg.Properties) (*table.Table, error) {
+ id := identifier
+ if r.name != "" {
+ id = append([]string{r.name}, identifier...)
+ }
+
+ iofs, err := iceio.LoadFS(config, loc)
+ if err != nil {
+ return nil, err
+ }
+
+ return table.New(id, metadata, loc, iofs), nil
+}
+
func (r *RestCatalog) ListTables(ctx context.Context, namespace
table.Identifier) ([]table.Identifier, error) {
if err := checkValidNamespace(namespace); err != nil {
return nil, err
@@ -546,12 +640,8 @@ func (r *RestCatalog) ListTables(ctx context.Context,
namespace table.Identifier
path := []string{"namespaces", ns, "tables"}
type resp struct {
- Identifiers []struct {
- Namespace []string `json:"namespace"`
- Name string `json:"name"`
- } `json:"identifiers"`
+ Identifiers []identifier `json:"identifiers"`
}
-
rsp, err := doGet[resp](ctx, r.baseURI, path, r.cl,
map[int]error{http.StatusNotFound: ErrNoSuchNamespace})
if err != nil {
return nil, err
@@ -573,64 +663,156 @@ func splitIdentForPath(ident table.Identifier) (string,
string, error) {
return strings.Join(NamespaceFromIdent(ident), namespaceSeparator),
TableNameFromIdent(ident), nil
}
-type tblResponse struct {
- MetadataLoc string `json:"metadata-location"`
- RawMetadata json.RawMessage `json:"metadata"`
- Config iceberg.Properties `json:"config"`
- Metadata table.Metadata `json:"-"`
-}
+func (r *RestCatalog) CreateTable(ctx context.Context, identifier
table.Identifier, schema *iceberg.Schema, opts ...createTableOption)
(*table.Table, error) {
+ ns, tbl, err := splitIdentForPath(identifier)
+ if err != nil {
+ return nil, err
+ }
-func (t *tblResponse) UnmarshalJSON(b []byte) (err error) {
- type Alias tblResponse
- if err = json.Unmarshal(b, (*Alias)(t)); err != nil {
- return err
+ payload := createTableRequest{
+ Name: tbl,
+ Schema: schema,
+ }
+ for _, o := range opts {
+ o(&payload)
}
- t.Metadata, err = table.ParseMetadataBytes(t.RawMetadata)
- return
+ ret, err := doPost[createTableRequest, loadTableResponse](ctx,
r.baseURI, []string{"namespaces", ns, "tables"}, payload,
+ r.cl, map[int]error{http.StatusNotFound: ErrNoSuchNamespace,
http.StatusConflict: ErrTableAlreadyExists})
+ if err != nil {
+ return nil, err
+ }
+
+ config := maps.Clone(r.props)
+ maps.Copy(config, ret.Metadata.Properties())
+ maps.Copy(config, ret.Config)
+
+ return r.tableFromResponse(identifier, ret.Metadata, ret.MetadataLoc,
config)
}
-func (r *RestCatalog) LoadTable(ctx context.Context, identifier
table.Identifier, props iceberg.Properties) (*table.Table, error) {
+func (r *RestCatalog) RegisterTable(ctx context.Context, identifier
table.Identifier, metadataLoc string) (*table.Table, error) {
ns, tbl, err := splitIdentForPath(identifier)
if err != nil {
return nil, err
}
- if props == nil {
- props = iceberg.Properties{}
+ type payload struct {
+ Name string `json:"name"`
+ MetadataLoc string `json:"metadata-location"`
}
- ret, err := doGet[tblResponse](ctx, r.baseURI, []string{"namespaces",
ns, "tables", tbl},
- r.cl, map[int]error{http.StatusNotFound: ErrNoSuchTable})
+ ret, err := doPost[payload, loadTableResponse](ctx, r.baseURI,
[]string{"namespaces", ns, "tables", tbl},
+ payload{Name: tbl, MetadataLoc: metadataLoc}, r.cl,
map[int]error{http.StatusNotFound: ErrNoSuchNamespace, http.StatusConflict:
ErrTableAlreadyExists})
if err != nil {
return nil, err
}
- id := identifier
- if r.name != "" {
- id = append([]string{r.name}, identifier...)
+ config := maps.Clone(r.props)
+ maps.Copy(config, ret.Metadata.Properties())
+ maps.Copy(config, ret.Config)
+ return r.tableFromResponse(identifier, ret.Metadata, ret.MetadataLoc,
config)
+}
+
+func (r *RestCatalog) LoadTable(ctx context.Context, identifier
table.Identifier, props iceberg.Properties) (*table.Table, error) {
+ ns, tbl, err := splitIdentForPath(identifier)
+ if err != nil {
+ return nil, err
}
- tblProps := maps.Clone(r.props)
- maps.Copy(tblProps, props)
- maps.Copy(tblProps, ret.Metadata.Properties())
+ ret, err := doGet[loadTableResponse](ctx, r.baseURI,
[]string{"namespaces", ns, "tables", tbl},
+ r.cl, map[int]error{http.StatusNotFound: ErrNoSuchTable})
+ if err != nil {
+ return nil, err
+ }
+
+ config := maps.Clone(r.props)
+ maps.Copy(config, props)
+ maps.Copy(config, ret.Metadata.Properties())
for k, v := range ret.Config {
- tblProps[k] = v
+ config[k] = v
}
- iofs, err := iceio.LoadFS(tblProps, ret.MetadataLoc)
+ return r.tableFromResponse(identifier, ret.Metadata, ret.MetadataLoc,
config)
+}
+
+func (r *RestCatalog) UpdateTable(ctx context.Context, ident table.Identifier,
requirements []table.Requirement, updates []table.Update) (*table.Table, error)
{
+ ns, tbl, err := splitIdentForPath(ident)
if err != nil {
return nil, err
}
- return table.New(id, ret.Metadata, ret.MetadataLoc, iofs), nil
+
+ restIdentifier := identifier{
+ Namespace: NamespaceFromIdent(ident),
+ Name: tbl,
+ }
+ type payload struct {
+ Identifier identifier `json:"identifier"`
+ Requirements []table.Requirement `json:"requirements"`
+ Updates []table.Update `json:"updates"`
+ }
+ ret, err := doPost[payload, commitTableResponse](ctx, r.baseURI,
[]string{"namespaces", ns, "tables", tbl},
+ payload{Identifier: restIdentifier, Requirements: requirements,
Updates: updates}, r.cl,
+ map[int]error{http.StatusNotFound: ErrNoSuchTable,
http.StatusConflict: ErrCommitFailed})
+ if err != nil {
+ return nil, err
+ }
+
+ config := maps.Clone(r.props)
+ maps.Copy(config, ret.Metadata.Properties())
+
+ return r.tableFromResponse(ident, ret.Metadata, ret.MetadataLoc, config)
}
func (r *RestCatalog) DropTable(ctx context.Context, identifier
table.Identifier) error {
- return fmt.Errorf("%w: [Rest Catalog] drop table",
iceberg.ErrNotImplemented)
+ ns, tbl, err := splitIdentForPath(identifier)
+ if err != nil {
+ return err
+ }
+
+ _, err = doDelete[struct{}](ctx, r.baseURI, []string{"namespaces", ns,
"tables", tbl}, r.cl,
+ map[int]error{http.StatusNotFound: ErrNoSuchTable})
+
+ return err
+}
+
+func (r *RestCatalog) PurgeTable(ctx context.Context, identifier
table.Identifier) error {
+ ns, tbl, err := splitIdentForPath(identifier)
+ if err != nil {
+ return err
+ }
+
+ uri := r.baseURI.JoinPath("namespaces", ns, "tables", tbl)
+ v := url.Values{}
+ v.Set("purgeRequested", "true")
+ uri.RawQuery = v.Encode()
+
+ _, err = doDelete[struct{}](ctx, uri, []string{}, r.cl,
+ map[int]error{http.StatusNotFound: ErrNoSuchTable})
+
+ return err
}
func (r *RestCatalog) RenameTable(ctx context.Context, from, to
table.Identifier) (*table.Table, error) {
- return nil, fmt.Errorf("%w: [Rest Catalog] rename table",
iceberg.ErrNotImplemented)
+ type payload struct {
+ From identifier `json:"from"`
+ To identifier `json:"to"`
+ }
+ f := identifier{
+ Namespace: NamespaceFromIdent(from),
+ Name: TableNameFromIdent(from),
+ }
+ t := identifier{
+ Namespace: NamespaceFromIdent(to),
+ Name: TableNameFromIdent(to),
+ }
+
+ _, err := doPost[payload, any](ctx, r.baseURI, []string{"tables",
"rename"}, payload{From: f, To: t}, r.cl,
+ map[int]error{http.StatusNotFound: ErrNoSuchTable})
+ if err != nil {
+ return nil, err
+ }
+
+ return r.LoadTable(ctx, to, nil)
}
func (r *RestCatalog) CreateNamespace(ctx context.Context, namespace
table.Identifier, props iceberg.Properties) error {
@@ -710,3 +892,20 @@ func (r *RestCatalog) UpdateNamespaceProperties(ctx
context.Context, namespace t
return doPost[payload, PropertiesUpdateSummary](ctx, r.baseURI,
[]string{"namespaces", ns, "properties"},
payload{Remove: removals, Updates: updates}, r.cl,
map[int]error{http.StatusNotFound: ErrNoSuchNamespace})
}
+
+func (r *RestCatalog) CheckNamespaceExists(ctx context.Context, namespace
table.Identifier) (bool, error) {
+ if err := checkValidNamespace(namespace); err != nil {
+ return false, err
+ }
+
+ _, err := doGet[struct{}](ctx, r.baseURI, []string{"namespaces",
strings.Join(namespace, namespaceSeparator)},
+ r.cl, map[int]error{http.StatusNotFound: ErrNoSuchNamespace})
+ if err != nil {
+ if errors.Is(err, ErrNoSuchNamespace) {
+ return false, nil
+ }
+ return false, err
+ }
+
+ return true, nil
+}
diff --git a/catalog/rest_test.go b/catalog/rest_test.go
index 618c5e0..212d5a8 100644
--- a/catalog/rest_test.go
+++ b/catalog/rest_test.go
@@ -22,6 +22,7 @@ import (
"crypto/tls"
"crypto/x509"
"encoding/json"
+ "fmt"
"net/http"
"net/http/httptest"
"net/url"
@@ -624,6 +625,105 @@ func (r *RestCatalogSuite) TestUpdateNamespaceProps404() {
r.ErrorContains(err, "Namespace does not exist: does_not_exist in
warehouse")
}
+var (
+ exampleTableMetadataNoSnapshotV1 = `{
+ "format-version": 1,
+ "table-uuid": "bf289591-dcc0-4234-ad4f-5c3eed811a29",
+ "location": "s3://warehouse/database/table",
+ "last-updated-ms": 1657810967051,
+ "last-column-id": 3,
+ "schema": {
+ "type": "struct",
+ "schema-id": 0,
+ "identifier-field-ids": [2],
+ "fields": [
+ {"id": 1, "name": "foo", "required": false, "type":
"string"},
+ {"id": 2, "name": "bar", "required": true, "type":
"int"},
+ {"id": 3, "name": "baz", "required": false, "type":
"boolean"}
+ ]
+ },
+ "current-schema-id": 0,
+ "schemas": [
+ {
+ "type": "struct",
+ "schema-id": 0,
+ "identifier-field-ids": [2],
+ "fields": [
+ {"id": 1, "name": "foo", "required": false,
"type": "string"},
+ {"id": 2, "name": "bar", "required": true,
"type": "int"},
+ {"id": 3, "name": "baz", "required": false,
"type": "boolean"}
+ ]
+ }
+ ],
+ "partition-spec": [],
+ "default-spec-id": 0,
+ "last-partition-id": 999,
+ "default-sort-order-id": 0,
+ "sort-orders": [{"order-id": 0, "fields": []}],
+ "properties": {
+ "write.delete.parquet.compression-codec": "zstd",
+ "write.metadata.compression-codec": "gzip",
+ "write.summary.partition-limit": "100",
+ "write.parquet.compression-codec": "zstd"
+ },
+ "current-snapshot-id": -1,
+ "refs": {},
+ "snapshots": [],
+ "snapshot-log": [],
+ "metadata-log": []
+}`
+
+ createTableRestExample = fmt.Sprintf(`{
+ "metadata-location": "s3://warehouse/database/table/metadata.json",
+ "metadata": %s,
+ "config": {
+ "client.factory":
"io.tabular.iceberg.catalog.TabularAwsClientFactory",
+ "region": "us-west-2"
+ }
+}`, exampleTableMetadataNoSnapshotV1)
+
+ tableSchemaSimple = iceberg.NewSchemaWithIdentifiers(1, []int{2},
+ iceberg.NestedField{ID: 1, Name: "foo", Type:
iceberg.StringType{}, Required: false},
+ iceberg.NestedField{ID: 2, Name: "bar", Type:
iceberg.PrimitiveTypes.Int32, Required: true},
+ iceberg.NestedField{ID: 3, Name: "baz", Type:
iceberg.PrimitiveTypes.Bool, Required: false},
+ )
+)
+
+func (r *RestCatalogSuite) TestCreateTable200() {
+ r.mux.HandleFunc("/v1/namespaces/fokko/tables", func(w
http.ResponseWriter, req *http.Request) {
+ r.Require().Equal(http.MethodPost, req.Method)
+
+ for k, v := range TestHeaders {
+ r.Equal(v, req.Header.Values(k))
+ }
+
+ w.Write([]byte(createTableRestExample))
+ })
+
+ t := createTableRestExample
+ _ = t
+ cat, err := catalog.NewRestCatalog("rest", r.srv.URL,
catalog.WithOAuthToken(TestToken))
+ r.Require().NoError(err)
+
+ tbl, err := cat.CreateTable(
+ context.Background(),
+ catalog.ToRestIdentifier("fokko", "fokko2"),
+ tableSchemaSimple,
+ )
+ r.Require().NoError(err)
+
+ r.Equal(catalog.ToRestIdentifier("rest", "fokko", "fokko2"),
tbl.Identifier())
+ r.Equal("s3://warehouse/database/table/metadata.json",
tbl.MetadataLocation())
+ r.EqualValues(1, tbl.Metadata().Version())
+ r.Equal("bf289591-dcc0-4234-ad4f-5c3eed811a29",
tbl.Metadata().TableUUID().String())
+ r.EqualValues(1657810967051, tbl.Metadata().LastUpdatedMillis())
+ r.Equal(3, tbl.Metadata().LastColumnID())
+ r.Zero(tbl.Schema().ID)
+ r.Zero(tbl.Metadata().DefaultPartitionSpec())
+ r.Equal(999, *tbl.Metadata().LastPartitionSpecID())
+ r.Equal(table.UnsortedSortOrder, tbl.SortOrder())
+}
+
func (r *RestCatalogSuite) TestLoadTable200() {
r.mux.HandleFunc("/v1/namespaces/fokko/tables/table", func(w
http.ResponseWriter, req *http.Request) {
r.Require().Equal(http.MethodGet, req.Method)
diff --git a/go.mod b/go.mod
index eb42b05..0c3f744 100644
--- a/go.mod
+++ b/go.mod
@@ -104,7 +104,8 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
- github.com/rivo/uniseg v0.4.4 // indirect
+ github.com/rivo/uniseg v0.4.7 // indirect
+ github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/substrait-io/substrait v0.57.1 // indirect
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect
diff --git a/go.sum b/go.sum
index 76872a9..2ecff2e 100644
--- a/go.sum
+++ b/go.sum
@@ -238,10 +238,10 @@ github.com/pterm/pterm v0.12.40/go.mod
h1:ffwPLwlbXxP+rxT0GsgDTzS3y3rmpAO1NMjUkG
github.com/pterm/pterm v0.12.80 h1:mM55B+GnKUnLMUSqhdINe4s6tOuVQIetQ3my8JGyAIg=
github.com/pterm/pterm v0.12.80/go.mod
h1:c6DeF9bSnOSeFPZlfs4ZRAFcf5SCoTwvwQ5xaKGQlHo=
github.com/rivo/uniseg v0.2.0/go.mod
h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
-github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis=
-github.com/rivo/uniseg v0.4.4/go.mod
h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
-github.com/rogpeppe/go-internal v1.9.0
h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
-github.com/rogpeppe/go-internal v1.9.0/go.mod
h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
+github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
+github.com/rivo/uniseg v0.4.7/go.mod
h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
+github.com/rogpeppe/go-internal v1.12.0
h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
+github.com/rogpeppe/go-internal v1.12.0/go.mod
h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ=
github.com/sergi/go-diff v1.2.0/go.mod
h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/stretchr/objx v0.1.0/go.mod
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
diff --git a/partitions.go b/partitions.go
index 48b730c..9416d70 100644
--- a/partitions.go
+++ b/partitions.go
@@ -20,6 +20,7 @@ package iceberg
import (
"encoding/json"
"fmt"
+ "iter"
"slices"
"strings"
)
@@ -116,6 +117,11 @@ func (ps PartitionSpec) Equals(other PartitionSpec) bool {
return ps.id == other.id && slices.Equal(ps.fields, other.fields)
}
+// Fields returns a clone of the partition fields in this spec.
+func (ps *PartitionSpec) Fields() iter.Seq[PartitionField] {
+ return slices.Values(ps.fields)
+}
+
func (ps PartitionSpec) MarshalJSON() ([]byte, error) {
if ps.fields == nil {
ps.fields = []PartitionField{}
diff --git a/table/metadata.go b/table/metadata.go
index 47b3ffe..73021c5 100644
--- a/table/metadata.go
+++ b/table/metadata.go
@@ -22,14 +22,26 @@ import (
"errors"
"fmt"
"io"
+ "iter"
"maps"
"slices"
+ "time"
"github.com/apache/iceberg-go"
"github.com/google/uuid"
)
+const (
+ partitionFieldStartID = 1000
+ supportedTableFormatVersion = 2
+
+ addPartionSpecAction = "add-partition-spec"
+ addSchemaAction = "add-schema"
+ addSnapshotAction = "add-snapshot"
+ addSortOrderAction = "add-sort-order"
+)
+
// Metadata for an iceberg table as specified in the Iceberg spec
//
// https://iceberg.apache.org/spec/#iceberg-table-spec
@@ -80,20 +92,542 @@ type Metadata interface {
SnapshotByName(name string) *Snapshot
// CurrentSnapshot returns the table's current snapshot.
CurrentSnapshot() *Snapshot
+ // Ref returns the snapshot ref for the main branch.
+ Ref() SnapshotRef
+ // Refs returns a list of snapshot name/reference pairs.
+ Refs() iter.Seq2[string, SnapshotRef]
+ // SnapshotLogs returns the list of snapshot logs for the table.
+ SnapshotLogs() iter.Seq[SnapshotLogEntry]
// SortOrder returns the table's current sort order, ie: the one with
the
// ID that matches the default-sort-order-id.
SortOrder() SortOrder
// SortOrders returns the list of sort orders in the table.
SortOrders() []SortOrder
+ // DefaultSortOrder returns the ID of the current sort order that
writers
+ // should use by default.
+ DefaultSortOrder() int
// Properties is a string to string map of table properties. This is
used
// to control settings that affect reading and writing and is not
intended
// to be used for arbitrary metadata. For example,
commit.retry.num-retries
// is used to control the number of commit retries.
Properties() iceberg.Properties
+ // PreviousFiles returns the list of metadata log entries for the table.
+ PreviousFiles() iter.Seq[MetadataLogEntry]
Equals(Metadata) bool
}
+type MetadataBuilder struct {
+ base Metadata
+ updates []Update
+
+ // common fields
+ formatVersion int
+ uuid uuid.UUID
+ loc string
+ lastUpdatedMS int64
+ lastColumnId int
+ schemaList []*iceberg.Schema
+ currentSchemaID int
+ specs []iceberg.PartitionSpec
+ defaultSpecID int
+ lastPartitionID *int
+ props iceberg.Properties
+ snapshotList []Snapshot
+ currentSnapshotID *int64
+ snapshotLog []SnapshotLogEntry
+ metadataLog []MetadataLogEntry
+ sortOrderList []SortOrder
+ defaultSortOrderID int
+ refs map[string]SnapshotRef
+
+ // V2 specific
+ lastSequenceNumber *int64
+}
+
+func NewMetadataBuilder() (*MetadataBuilder, error) {
+ return &MetadataBuilder{
+ updates: make([]Update, 0),
+ schemaList: make([]*iceberg.Schema, 0),
+ specs: make([]iceberg.PartitionSpec, 0),
+ props: make(iceberg.Properties),
+ snapshotList: make([]Snapshot, 0),
+ snapshotLog: make([]SnapshotLogEntry, 0),
+ metadataLog: make([]MetadataLogEntry, 0),
+ sortOrderList: make([]SortOrder, 0),
+ refs: make(map[string]SnapshotRef),
+ }, nil
+}
+
+func MetadataBuilderFromBase(metadata Metadata) (*MetadataBuilder, error) {
+ b := &MetadataBuilder{}
+ b.base = metadata
+
+ b.formatVersion = metadata.Version()
+ b.uuid = metadata.TableUUID()
+ b.loc = metadata.Location()
+ b.lastUpdatedMS = metadata.LastUpdatedMillis()
+ b.lastColumnId = metadata.LastColumnID()
+ b.schemaList = metadata.Schemas()
+ b.currentSchemaID = metadata.CurrentSchema().ID
+ b.specs = metadata.PartitionSpecs()
+ b.defaultSpecID = metadata.DefaultPartitionSpec()
+ b.lastPartitionID = metadata.LastPartitionSpecID()
+ b.props = metadata.Properties()
+ b.snapshotList = metadata.Snapshots()
+ b.currentSnapshotID = &metadata.CurrentSnapshot().SnapshotID
+ b.sortOrderList = metadata.SortOrders()
+ b.defaultSortOrderID = metadata.DefaultSortOrder()
+
+ b.refs = make(map[string]SnapshotRef)
+ for name, ref := range metadata.Refs() {
+ b.refs[name] = ref
+ }
+
+ b.snapshotLog = make([]SnapshotLogEntry, 0)
+ for log := range metadata.SnapshotLogs() {
+ b.snapshotLog = append(b.snapshotLog, log)
+ }
+
+ b.metadataLog = make([]MetadataLogEntry, 0)
+ for entry := range metadata.PreviousFiles() {
+ b.metadataLog = append(b.metadataLog, entry)
+ }
+
+ return b, nil
+}
+
+func (b *MetadataBuilder) AddSchema(schema *iceberg.Schema, newLastColumnID
int, initial bool) (*MetadataBuilder, error) {
+ if newLastColumnID < b.lastColumnId {
+ return nil, fmt.Errorf("%w: newLastColumnID %d, must be >= %d",
iceberg.ErrInvalidArgument, newLastColumnID, b.lastColumnId)
+ }
+
+ var schemas []*iceberg.Schema
+ if initial {
+ schemas = []*iceberg.Schema{schema}
+ } else {
+ schemas = append(b.schemaList, schema)
+ }
+
+ b.lastColumnId = newLastColumnID
+ b.schemaList = schemas
+ b.updates = append(b.updates, NewAddSchemaUpdate(schema,
newLastColumnID, initial))
+
+ return b, nil
+}
+
+func (b *MetadataBuilder) AddPartitionSpec(spec *iceberg.PartitionSpec,
initial bool) (*MetadataBuilder, error) {
+ for _, s := range b.specs {
+ if s.ID() == spec.ID() && !initial {
+ return nil, fmt.Errorf("partition spec with id %d
already exists", spec.ID())
+ }
+ }
+
+ maxFieldID := 0
+ for f := range spec.Fields() {
+ maxFieldID = max(maxFieldID, f.FieldID)
+ }
+
+ prev := partitionFieldStartID - 1
+ if b.lastPartitionID != nil {
+ prev = *b.lastPartitionID
+ }
+ lastPartitionID := max(maxFieldID, prev)
+
+ var specs []iceberg.PartitionSpec
+ if initial {
+ specs = []iceberg.PartitionSpec{*spec}
+ } else {
+ specs = append(b.specs, *spec)
+ }
+
+ b.specs = specs
+ b.lastPartitionID = &lastPartitionID
+ b.updates = append(b.updates, NewAddPartitionSpecUpdate(spec, initial))
+
+ return b, nil
+}
+
+func (b *MetadataBuilder) AddSnapshot(snapshot *Snapshot) (*MetadataBuilder,
error) {
+ if snapshot == nil {
+ return nil, nil
+ }
+
+ if len(b.schemaList) == 0 {
+ return nil, errors.New("can't add snapshot with no added
schemas")
+ } else if len(b.specs) == 0 {
+ return nil, errors.New("can't add snapshot with no added
partition specs")
+ } else if s, _ := b.SnapshotByID(snapshot.SnapshotID); s != nil {
+ return nil, fmt.Errorf("can't add snapshot with id %d, already
exists", snapshot.SnapshotID)
+ } else if b.formatVersion == 2 &&
+ snapshot.SequenceNumber > 0 &&
+ snapshot.SequenceNumber <= *b.lastSequenceNumber &&
+ snapshot.ParentSnapshotID != nil {
+ return nil, fmt.Errorf("can't add snapshot with sequence number
%d, must be > than last sequence number %d",
+ snapshot.SequenceNumber, b.lastSequenceNumber)
+ }
+
+ b.updates = append(b.updates, NewAddSnapshotUpdate(snapshot))
+ b.lastUpdatedMS = snapshot.TimestampMs
+ b.lastSequenceNumber = &snapshot.SequenceNumber
+ b.snapshotList = append(b.snapshotList, *snapshot)
+ return b, nil
+}
+
+func (b *MetadataBuilder) AddSortOrder(sortOrder *SortOrder, initial bool)
(*MetadataBuilder, error) {
+ var sortOrders []SortOrder
+ if !initial {
+ sortOrders = append(sortOrders, b.sortOrderList...)
+ }
+
+ for _, s := range sortOrders {
+ if s.OrderID == sortOrder.OrderID {
+ return nil, fmt.Errorf("sort order with id %d already
exists", sortOrder.OrderID)
+ }
+ }
+
+ b.sortOrderList = append(sortOrders, *sortOrder)
+ b.updates = append(b.updates, NewAddSortOrderUpdate(sortOrder, initial))
+
+ return b, nil
+}
+
+func (b *MetadataBuilder) RemoveProperties(keys []string) (*MetadataBuilder,
error) {
+ if len(keys) == 0 {
+ return b, nil
+ }
+
+ b.updates = append(b.updates, NewRemovePropertiesUpdate(keys))
+ for _, key := range keys {
+ delete(b.props, key)
+ }
+
+ return b, nil
+}
+
+func (b *MetadataBuilder) SetCurrentSchemaID(currentSchemaID int)
(*MetadataBuilder, error) {
+ if currentSchemaID == -1 {
+ currentSchemaID = maxBy(b.schemaList, func(s *iceberg.Schema)
int {
+ return s.ID
+ })
+ if !slices.ContainsFunc(b.updates, func(u Update) bool {
+ return u.Action() == addSchemaAction &&
u.(*addSchemaUpdate).Schema.ID == currentSchemaID
+ }) {
+ return nil, errors.New("can't set current schema to
last added schema, no schema has been added")
+ }
+ }
+
+ if currentSchemaID == b.currentSchemaID {
+ return b, nil
+ }
+
+ _, err := b.GetSchemaByID(currentSchemaID)
+ if err != nil {
+ return nil, fmt.Errorf("can't set current schema to schema with
id %d: %w", currentSchemaID, err)
+ }
+
+ b.updates = append(b.updates,
NewSetCurrentSchemaUpdate(currentSchemaID))
+ b.currentSchemaID = currentSchemaID
+ return b, nil
+}
+
+func (b *MetadataBuilder) SetDefaultSortOrderID(defaultSortOrderID int)
(*MetadataBuilder, error) {
+ if defaultSortOrderID == -1 {
+ defaultSortOrderID = maxBy(b.sortOrderList, func(s SortOrder)
int {
+ return s.OrderID
+ })
+ if !slices.ContainsFunc(b.updates, func(u Update) bool {
+ return u.Action() == addSortOrderAction &&
u.(*addSortOrderUpdate).SortOrder.OrderID == defaultSortOrderID
+ }) {
+ return nil, fmt.Errorf("can't set default sort order to
last added with no added sort orders")
+ }
+ }
+
+ if defaultSortOrderID == b.defaultSortOrderID {
+ return b, nil
+ }
+
+ if _, err := b.GetSortOrderByID(defaultSortOrderID); err != nil {
+ return nil, fmt.Errorf("can't set default sort order to sort
order with id %d: %w", defaultSortOrderID, err)
+ }
+
+ b.updates = append(b.updates,
NewSetDefaultSortOrderUpdate(defaultSortOrderID))
+ b.defaultSortOrderID = defaultSortOrderID
+ return b, nil
+}
+
+func (b *MetadataBuilder) SetDefaultSpecID(defaultSpecID int)
(*MetadataBuilder, error) {
+ if defaultSpecID == -1 {
+ defaultSpecID = maxBy(b.specs, func(s iceberg.PartitionSpec)
int {
+ return s.ID()
+ })
+ if !slices.ContainsFunc(b.updates, func(u Update) bool {
+ return u.Action() == addPartionSpecAction &&
u.(*addPartitionSpecUpdate).Spec.ID() == defaultSpecID
+ }) {
+ return nil, fmt.Errorf("can't set default spec to last
added with no added partition specs")
+ }
+ }
+
+ if defaultSpecID == b.defaultSpecID {
+ return b, nil
+ }
+
+ if _, err := b.GetSpecByID(defaultSpecID); err != nil {
+ return nil, fmt.Errorf("can't set default spec to spec with id
%d: %w", defaultSpecID, err)
+ }
+
+ b.updates = append(b.updates, NewSetDefaultSpecUpdate(defaultSpecID))
+ b.defaultSpecID = defaultSpecID
+ return b, nil
+}
+
+func (b *MetadataBuilder) SetFormatVersion(formatVersion int)
(*MetadataBuilder, error) {
+ if formatVersion < b.formatVersion {
+ return nil, fmt.Errorf("downgrading format version from %d to
%d is not allowed",
+ b.formatVersion, formatVersion)
+ }
+
+ if formatVersion > supportedTableFormatVersion {
+ return nil, fmt.Errorf("unsupported format version %d",
formatVersion)
+ }
+
+ if formatVersion == b.formatVersion {
+ return b, nil
+ }
+
+ b.updates = append(b.updates,
NewUpgradeFormatVersionUpdate(formatVersion))
+ b.formatVersion = formatVersion
+ return b, nil
+}
+
+func (b *MetadataBuilder) SetLoc(loc string) (*MetadataBuilder, error) {
+ if b.loc == loc {
+ return b, nil
+ }
+
+ b.updates = append(b.updates, NewSetLocationUpdate(loc))
+ b.loc = loc
+ return b, nil
+}
+
+func (b *MetadataBuilder) SetProperties(props iceberg.Properties)
(*MetadataBuilder, error) {
+ if len(props) == 0 {
+ return b, nil
+ }
+
+ b.updates = append(b.updates, NewSetPropertiesUpdate(props))
+ maps.Copy(b.props, props)
+ return b, nil
+}
+
+type setSnapshotRefOption func(*SnapshotRef) error
+
+func WithMaxRefAgeMs(maxRefAgeMs int64) setSnapshotRefOption {
+ return func(ref *SnapshotRef) error {
+ if maxRefAgeMs <= 0 {
+ return fmt.Errorf("%w: maxRefAgeMs %d, must be > 0",
iceberg.ErrInvalidArgument, maxRefAgeMs)
+ }
+ ref.MaxRefAgeMs = &maxRefAgeMs
+ return nil
+ }
+}
+
+func WithMaxSnapshotAgeMs(maxSnapshotAgeMs int64) setSnapshotRefOption {
+ return func(ref *SnapshotRef) error {
+ if maxSnapshotAgeMs <= 0 {
+ return fmt.Errorf("%w: maxSnapshotAgeMs %d, must be >
0", iceberg.ErrInvalidArgument, maxSnapshotAgeMs)
+ }
+ ref.MaxSnapshotAgeMs = &maxSnapshotAgeMs
+ return nil
+ }
+}
+
+func WithMinSnapshotsToKeep(minSnapshotsToKeep int) setSnapshotRefOption {
+ return func(ref *SnapshotRef) error {
+ if minSnapshotsToKeep <= 0 {
+ return fmt.Errorf("%w: minSnapshotsToKeep %d, must be >
0", iceberg.ErrInvalidArgument, minSnapshotsToKeep)
+ }
+ ref.MinSnapshotsToKeep = &minSnapshotsToKeep
+ return nil
+ }
+}
+
+func (b *MetadataBuilder) SetSnapshotRef(
+ name string,
+ snapshotID int64,
+ refType RefType,
+ options ...setSnapshotRefOption,
+) (*MetadataBuilder, error) {
+ ref := SnapshotRef{
+ SnapshotID: snapshotID,
+ SnapshotRefType: refType,
+ }
+ for _, opt := range options {
+ if err := opt(&ref); err != nil {
+ return nil, fmt.Errorf("invalid snapshot ref option:
%w", err)
+ }
+ }
+
+ var maxRefAgeMs, maxSnapshotAgeMs int64
+ var minSnapshotsToKeep int
+ if ref.MaxRefAgeMs != nil {
+ maxRefAgeMs = *ref.MaxRefAgeMs
+ }
+ if ref.MaxSnapshotAgeMs != nil {
+ maxSnapshotAgeMs = *ref.MaxSnapshotAgeMs
+ }
+ if ref.MinSnapshotsToKeep != nil {
+ minSnapshotsToKeep = *ref.MinSnapshotsToKeep
+ }
+
+ if existingRef, ok := b.refs[name]; ok && existingRef.Equals(ref) {
+ return b, nil
+ }
+
+ snapshot, err := b.SnapshotByID(snapshotID)
+ if err != nil {
+ return nil, fmt.Errorf("can't set snapshot ref %s to unknown
snapshot %d: %w", name, snapshotID, err)
+ }
+
+ if refType == MainBranch {
+ b.updates = append(b.updates, NewSetSnapshotRefUpdate(name,
snapshotID, refType, maxRefAgeMs, maxSnapshotAgeMs, minSnapshotsToKeep))
+ b.currentSnapshotID = &snapshotID
+ b.snapshotLog = append(b.snapshotLog, SnapshotLogEntry{
+ SnapshotID: snapshotID,
+ TimestampMs: snapshot.TimestampMs,
+ })
+ b.lastUpdatedMS = time.Now().Local().UnixMilli()
+ }
+
+ if slices.ContainsFunc(b.updates, func(u Update) bool {
+ return u.Action() == addSnapshotAction &&
u.(*addSnapshotUpdate).Snapshot.SnapshotID == snapshotID
+ }) {
+ b.lastUpdatedMS = snapshot.TimestampMs
+ }
+
+ b.refs[name] = ref
+ return b, nil
+}
+
+func (b *MetadataBuilder) SetUUID(uuid uuid.UUID) (*MetadataBuilder, error) {
+ if b.uuid == uuid {
+ return b, nil
+ }
+
+ b.updates = append(b.updates, NewAssignUUIDUpdate(uuid))
+ b.uuid = uuid
+ return b, nil
+}
+
+func (b *MetadataBuilder) buildCommonMetadata() *commonMetadata {
+ return &commonMetadata{
+ FormatVersion: b.formatVersion,
+ UUID: b.uuid,
+ Loc: b.loc,
+ LastUpdatedMS: b.lastUpdatedMS,
+ LastColumnId: b.lastColumnId,
+ SchemaList: b.schemaList,
+ CurrentSchemaID: b.currentSchemaID,
+ Specs: b.specs,
+ DefaultSpecID: b.defaultSpecID,
+ LastPartitionID: b.lastPartitionID,
+ Props: b.props,
+ SnapshotList: b.snapshotList,
+ CurrentSnapshotID: b.currentSnapshotID,
+ SnapshotLog: b.snapshotLog,
+ MetadataLog: b.metadataLog,
+ SortOrderList: b.sortOrderList,
+ DefaultSortOrderID: b.defaultSortOrderID,
+ SnapshotRefs: b.refs,
+ }
+}
+
+func (b *MetadataBuilder) GetSchemaByID(id int) (*iceberg.Schema, error) {
+ for _, s := range b.schemaList {
+ if s.ID == id {
+ return s, nil
+ }
+ }
+
+ return nil, fmt.Errorf("%w: schema with id %d not found",
iceberg.ErrInvalidArgument, id)
+}
+
+func (b *MetadataBuilder) GetSpecByID(id int) (*iceberg.PartitionSpec, error) {
+ for _, s := range b.specs {
+ if s.ID() == id {
+ return &s, nil
+ }
+ }
+
+ return nil, fmt.Errorf("partition spec with id %d not found", id)
+}
+
+func (b *MetadataBuilder) GetSortOrderByID(id int) (*SortOrder, error) {
+ for _, s := range b.sortOrderList {
+ if s.OrderID == id {
+ return &s, nil
+ }
+ }
+
+ return nil, fmt.Errorf("sort order with id %d not found", id)
+}
+
+func (b *MetadataBuilder) SnapshotByID(id int64) (*Snapshot, error) {
+ for _, s := range b.snapshotList {
+ if s.SnapshotID == id {
+ return &s, nil
+ }
+ }
+
+ return nil, fmt.Errorf("snapshot with id %d not found", id)
+}
+
+func (b *MetadataBuilder) Build() (Metadata, error) {
+ common := b.buildCommonMetadata()
+ switch b.formatVersion {
+ case 1:
+ schema, err := b.GetSchemaByID(b.currentSchemaID)
+ if err != nil {
+ return nil, fmt.Errorf("can't build metadata, missing
schema for schema ID %d: %w", b.currentSchemaID, err)
+ }
+
+ partition, err := b.GetSpecByID(b.defaultSpecID)
+ if err != nil {
+ return nil, fmt.Errorf("can't build metadata, missing
partition spec for spec ID %d: %w", b.defaultSpecID, err)
+ }
+
+ partitionFields := make([]iceberg.PartitionField, 0)
+ for f := range partition.Fields() {
+ partitionFields = append(partitionFields, f)
+ }
+
+ return &metadataV1{
+ Schema: schema,
+ Partition: partitionFields,
+ commonMetadata: *common,
+ }, nil
+
+ case 2:
+ return &metadataV2{
+ LastSequenceNumber: *b.lastSequenceNumber,
+ commonMetadata: *common,
+ }, nil
+
+ default:
+ panic("unreachable: invalid format version")
+ }
+}
+
+// maxBy returns the maximum value of extract(e) for all e in elems.
+// If elems is empty, returns 0.
+func maxBy[S ~[]E, E any](elems S, extract func(e E) int) int {
+ m := 0
+ for _, e := range elems {
+ m = max(m, extract(e))
+ }
+ return m
+}
+
var (
ErrInvalidMetadataFormatVersion = errors.New("invalid or missing
format-version in table metadata")
ErrInvalidMetadata = errors.New("invalid metadata")
@@ -128,9 +662,9 @@ func ParseMetadataBytes(b []byte) (Metadata, error) {
var ret Metadata
switch ver.FormatVersion {
case 1:
- ret = &MetadataV1{}
+ ret = &metadataV1{}
case 2:
- ret = &MetadataV2{}
+ ret = &metadataV2{}
default:
return nil, ErrInvalidMetadataFormatVersion
}
@@ -163,10 +697,28 @@ type commonMetadata struct {
MetadataLog []MetadataLogEntry `json:"metadata-log"`
SortOrderList []SortOrder `json:"sort-orders"`
DefaultSortOrderID int
`json:"default-sort-order-id"`
- Refs map[string]SnapshotRef `json:"refs"`
+ SnapshotRefs map[string]SnapshotRef `json:"refs"`
+}
+
+func (c *commonMetadata) Ref() SnapshotRef { return
c.SnapshotRefs[MainBranch] }
+func (c *commonMetadata) Refs() iter.Seq2[string, SnapshotRef] { return
maps.All(c.SnapshotRefs) }
+func (c *commonMetadata) SnapshotLogs() iter.Seq[SnapshotLogEntry] {
+ return slices.Values(c.SnapshotLog)
+}
+
+func (c *commonMetadata) PreviousFiles() iter.Seq[MetadataLogEntry] {
+ return slices.Values(c.MetadataLog)
}
func (c *commonMetadata) Equals(other *commonMetadata) bool {
+ if other == nil {
+ return false
+ }
+
+ if c == other {
+ return true
+ }
+
switch {
case c.LastPartitionID == nil && other.LastPartitionID != nil:
fallthrough
@@ -187,7 +739,7 @@ func (c *commonMetadata) Equals(other *commonMetadata) bool
{
fallthrough
case !maps.Equal(c.Props, other.Props):
fallthrough
- case !maps.EqualFunc(c.Refs, other.Refs, func(sr1, sr2 SnapshotRef)
bool { return sr1.Equals(sr2) }):
+ case !maps.EqualFunc(c.SnapshotRefs, other.SnapshotRefs, func(sr1, sr2
SnapshotRef) bool { return sr1.Equals(sr2) }):
return false
}
@@ -245,7 +797,7 @@ func (c *commonMetadata) SnapshotByID(id int64) *Snapshot {
}
func (c *commonMetadata) SnapshotByName(name string) *Snapshot {
- if ref, ok := c.Refs[name]; ok {
+ if ref, ok := c.SnapshotRefs[name]; ok {
return c.SnapshotByID(ref.SnapshotID)
}
return nil
@@ -268,6 +820,10 @@ func (c *commonMetadata) SortOrder() SortOrder {
return UnsortedSortOrder
}
+func (c *commonMetadata) DefaultSortOrder() int {
+ return c.DefaultSortOrderID
+}
+
func (c *commonMetadata) Properties() iceberg.Properties {
return c.Props
}
@@ -284,8 +840,8 @@ func (c *commonMetadata) preValidate() {
}
if c.CurrentSnapshotID != nil {
- if _, ok := c.Refs[MainBranch]; !ok {
- c.Refs[MainBranch] = SnapshotRef{
+ if _, ok := c.SnapshotRefs[MainBranch]; !ok {
+ c.SnapshotRefs[MainBranch] = SnapshotRef{
SnapshotID: *c.CurrentSnapshotID,
SnapshotRefType: BranchRef,
}
@@ -296,8 +852,8 @@ func (c *commonMetadata) preValidate() {
c.MetadataLog = []MetadataLogEntry{}
}
- if c.Refs == nil {
- c.Refs = make(map[string]SnapshotRef)
+ if c.SnapshotRefs == nil {
+ c.SnapshotRefs = make(map[string]SnapshotRef)
}
if c.SnapshotLog == nil {
@@ -370,26 +926,30 @@ func (c *commonMetadata) validate() error {
func (c *commonMetadata) Version() int { return c.FormatVersion }
-type MetadataV1 struct {
- Schema iceberg.Schema `json:"schema"`
+type metadataV1 struct {
+ Schema *iceberg.Schema `json:"schema"`
Partition []iceberg.PartitionField `json:"partition-spec"`
commonMetadata
}
-func (m *MetadataV1) Equals(other Metadata) bool {
- rhs, ok := other.(*MetadataV1)
+func (m *metadataV1) Equals(other Metadata) bool {
+ rhs, ok := other.(*metadataV1)
if !ok {
return false
}
- return m.Schema.Equals(&rhs.Schema) && slices.Equal(m.Partition,
rhs.Partition) &&
+ if m == rhs {
+ return true
+ }
+
+ return m.Schema.Equals(rhs.Schema) && slices.Equal(m.Partition,
rhs.Partition) &&
m.commonMetadata.Equals(&rhs.commonMetadata)
}
-func (m *MetadataV1) preValidate() {
- if len(m.SchemaList) == 0 {
- m.SchemaList = []*iceberg.Schema{&m.Schema}
+func (m *metadataV1) preValidate() {
+ if len(m.SchemaList) == 0 && m.Schema != nil {
+ m.SchemaList = []*iceberg.Schema{m.Schema}
}
if len(m.Specs) == 0 {
@@ -416,8 +976,8 @@ func (m *MetadataV1) preValidate() {
m.commonMetadata.preValidate()
}
-func (m *MetadataV1) UnmarshalJSON(b []byte) error {
- type Alias MetadataV1
+func (m *metadataV1) UnmarshalJSON(b []byte) error {
+ type Alias metadataV1
aux := (*Alias)(m)
if err := json.Unmarshal(b, aux); err != nil {
@@ -428,34 +988,38 @@ func (m *MetadataV1) UnmarshalJSON(b []byte) error {
return m.validate()
}
-func (m *MetadataV1) ToV2() MetadataV2 {
+func (m *metadataV1) ToV2() metadataV2 {
commonOut := m.commonMetadata
commonOut.FormatVersion = 2
if commonOut.UUID.String() == "" {
commonOut.UUID = uuid.New()
}
- return MetadataV2{commonMetadata: commonOut}
+ return metadataV2{commonMetadata: commonOut}
}
-type MetadataV2 struct {
- LastSequenceNumber int `json:"last-sequence-number"`
+type metadataV2 struct {
+ LastSequenceNumber int64 `json:"last-sequence-number"`
commonMetadata
}
-func (m *MetadataV2) Equals(other Metadata) bool {
- rhs, ok := other.(*MetadataV2)
+func (m *metadataV2) Equals(other Metadata) bool {
+ rhs, ok := other.(*metadataV2)
if !ok {
return false
}
+ if m == rhs {
+ return true
+ }
+
return m.LastSequenceNumber == rhs.LastSequenceNumber &&
m.commonMetadata.Equals(&rhs.commonMetadata)
}
-func (m *MetadataV2) UnmarshalJSON(b []byte) error {
- type Alias MetadataV2
+func (m *metadataV2) UnmarshalJSON(b []byte) error {
+ type Alias metadataV2
aux := (*Alias)(m)
if err := json.Unmarshal(b, aux); err != nil {
diff --git a/table/metadata_test.go b/table/metadata_internal_test.go
similarity index 93%
rename from table/metadata_test.go
rename to table/metadata_internal_test.go
index e268d88..a02ac7f 100644
--- a/table/metadata_test.go
+++ b/table/metadata_internal_test.go
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package table_test
+package table
import (
"encoding/json"
@@ -23,7 +23,6 @@ import (
"testing"
"github.com/apache/iceberg-go"
- "github.com/apache/iceberg-go/table"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -112,14 +111,14 @@ const ExampleTableMetadataV1 = `{
}`
func TestMetadataV1Parsing(t *testing.T) {
- meta, err := table.ParseMetadataBytes([]byte(ExampleTableMetadataV1))
+ meta, err := ParseMetadataBytes([]byte(ExampleTableMetadataV1))
require.NoError(t, err)
require.NotNil(t, meta)
- assert.IsType(t, (*table.MetadataV1)(nil), meta)
+ assert.IsType(t, (*metadataV1)(nil), meta)
assert.Equal(t, 1, meta.Version())
- data := meta.(*table.MetadataV1)
+ data := meta.(*metadataV1)
assert.Equal(t, uuid.MustParse("d20125c8-7284-442c-9aea-15fee620737c"),
meta.TableUUID())
assert.Equal(t, "s3://bucket/test/location", meta.Location())
assert.Equal(t, int64(1602638573874), meta.LastUpdatedMillis())
@@ -156,21 +155,21 @@ func TestMetadataV1Parsing(t *testing.T) {
assert.Nil(t, meta.SnapshotByID(0))
assert.Nil(t, meta.SnapshotByName("foo"))
assert.Zero(t, data.DefaultSortOrderID)
- assert.Equal(t, table.UnsortedSortOrder, meta.SortOrder())
+ assert.Equal(t, UnsortedSortOrder, meta.SortOrder())
}
func TestMetadataV2Parsing(t *testing.T) {
- meta, err := table.ParseMetadataBytes([]byte(ExampleTableMetadataV2))
+ meta, err := ParseMetadataBytes([]byte(ExampleTableMetadataV2))
require.NoError(t, err)
require.NotNil(t, meta)
- assert.IsType(t, (*table.MetadataV2)(nil), meta)
+ assert.IsType(t, (*metadataV2)(nil), meta)
assert.Equal(t, 2, meta.Version())
- data := meta.(*table.MetadataV2)
+ data := meta.(*metadataV2)
assert.Equal(t, uuid.MustParse("9c12d441-03fe-4693-9a96-a0705ddf69c1"),
data.UUID)
assert.Equal(t, "s3://bucket/test/location", data.Location())
- assert.Equal(t, 34, data.LastSequenceNumber)
+ assert.Equal(t, int64(34), data.LastSequenceNumber)
assert.Equal(t, int64(1602638573590), data.LastUpdatedMS)
assert.Equal(t, 3, data.LastColumnId)
assert.Equal(t, 0, data.SchemaList[0].ID)
@@ -192,7 +191,7 @@ func TestMetadataV2Parsing(t *testing.T) {
}
func TestParsingCorrectTypes(t *testing.T) {
- var meta table.MetadataV2
+ var meta metadataV2
require.NoError(t, json.Unmarshal([]byte(ExampleTableMetadataV2),
&meta))
assert.IsType(t, &iceberg.Schema{}, meta.SchemaList[0])
@@ -201,7 +200,7 @@ func TestParsingCorrectTypes(t *testing.T) {
}
func TestSerializeMetadataV1(t *testing.T) {
- var meta table.MetadataV1
+ var meta metadataV1
require.NoError(t, json.Unmarshal([]byte(ExampleTableMetadataV1),
&meta))
data, err := json.Marshal(&meta)
@@ -212,7 +211,7 @@ func TestSerializeMetadataV1(t *testing.T) {
}
func TestSerializeMetadataV2(t *testing.T) {
- var meta table.MetadataV2
+ var meta metadataV2
require.NoError(t, json.Unmarshal([]byte(ExampleTableMetadataV2),
&meta))
data, err := json.Marshal(&meta)
@@ -243,9 +242,9 @@ func TestInvalidFormatVersion(t *testing.T) {
"snapshots": []
}`
- _, err := table.ParseMetadataBytes([]byte(metadataInvalidFormat))
+ _, err := ParseMetadataBytes([]byte(metadataInvalidFormat))
assert.Error(t, err)
- assert.ErrorIs(t, err, table.ErrInvalidMetadataFormatVersion)
+ assert.ErrorIs(t, err, ErrInvalidMetadataFormatVersion)
}
func TestCurrentSchemaNotFound(t *testing.T) {
@@ -278,9 +277,9 @@ func TestCurrentSchemaNotFound(t *testing.T) {
"snapshots": []
}`
- _, err := table.ParseMetadataBytes([]byte(schemaNotFound))
+ _, err := ParseMetadataBytes([]byte(schemaNotFound))
assert.Error(t, err)
- assert.ErrorIs(t, err, table.ErrInvalidMetadata)
+ assert.ErrorIs(t, err, ErrInvalidMetadata)
assert.ErrorContains(t, err, "current-schema-id 2 can't be found in any
schema")
}
@@ -322,9 +321,9 @@ func TestSortOrderNotFound(t *testing.T) {
"snapshots": []
}`
- _, err := table.ParseMetadataBytes([]byte(metadataSortOrderNotFound))
+ _, err := ParseMetadataBytes([]byte(metadataSortOrderNotFound))
assert.Error(t, err)
- assert.ErrorIs(t, err, table.ErrInvalidMetadata)
+ assert.ErrorIs(t, err, ErrInvalidMetadata)
assert.ErrorContains(t, err, "default-sort-order-id 4 can't be found in
[3: [\n2 asc nulls-first\nbucket[4](3) desc nulls-last\n]]")
}
@@ -358,10 +357,10 @@ func TestSortOrderUnsorted(t *testing.T) {
"snapshots": []
}`
- var meta table.MetadataV2
+ var meta metadataV2
require.NoError(t, json.Unmarshal([]byte(sortOrderUnsorted), &meta))
- assert.Equal(t, table.UnsortedSortOrderID, meta.DefaultSortOrderID)
+ assert.Equal(t, UnsortedSortOrderID, meta.DefaultSortOrderID)
assert.Len(t, meta.SortOrderList, 0)
}
@@ -394,28 +393,28 @@ func TestInvalidPartitionSpecID(t *testing.T) {
"last-partition-id": 1000
}`
- var meta table.MetadataV2
+ var meta metadataV2
err := json.Unmarshal([]byte(invalidSpecID), &meta)
- assert.ErrorIs(t, err, table.ErrInvalidMetadata)
+ assert.ErrorIs(t, err, ErrInvalidMetadata)
assert.ErrorContains(t, err, "default-spec-id 1 can't be found")
}
func TestV2RefCreation(t *testing.T) {
- var meta table.MetadataV2
+ var meta metadataV2
require.NoError(t, json.Unmarshal([]byte(ExampleTableMetadataV2),
&meta))
maxRefAge := int64(10000000)
- assert.Equal(t, map[string]table.SnapshotRef{
+ assert.Equal(t, map[string]SnapshotRef{
"main": {
SnapshotID: 3055729675574597004,
- SnapshotRefType: table.BranchRef,
+ SnapshotRefType: BranchRef,
},
"test": {
SnapshotID: 3051729675574597004,
- SnapshotRefType: table.TagRef,
+ SnapshotRefType: TagRef,
MaxRefAgeMs: &maxRefAge,
},
- }, meta.Refs)
+ }, meta.SnapshotRefs)
}
func TestV1WriteMetadataToV2(t *testing.T) {
@@ -453,11 +452,11 @@ func TestV1WriteMetadataToV2(t *testing.T) {
"snapshots": [{"snapshot-id": 1925, "timestamp-ms":
1602638573822}]
}`
- meta, err := table.ParseMetadataString(minimalV1Example)
+ meta, err := ParseMetadataString(minimalV1Example)
require.NoError(t, err)
- assert.IsType(t, (*table.MetadataV1)(nil), meta)
+ assert.IsType(t, (*metadataV1)(nil), meta)
- metaV2 := meta.(*table.MetadataV1).ToV2()
+ metaV2 := meta.(*metadataV1).ToV2()
metaV2Json, err := json.Marshal(metaV2)
require.NoError(t, err)
diff --git a/table/requirements.go b/table/requirements.go
new file mode 100644
index 0000000..5a4b883
--- /dev/null
+++ b/table/requirements.go
@@ -0,0 +1,267 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+ "fmt"
+
+ "github.com/google/uuid"
+)
+
+const (
+ reqAssertCreate = "assert-create"
+ reqAssertTableUUID = "assert-table-uuid"
+ reqAssertRefSnapshotID = "assert-ref-snapshot-id"
+ reqAssertDefaultSpecID = "assert-default-spec-id"
+ reqAssertCurrentSchemaID = "assert-current-schema-id"
+ reqAssertDefaultSortOrderID = "assert-default-sort-order-id"
+ reqAssertLastAssignedFieldID = "assert-last-assigned-field-id"
+ reqAssertLastAssignedPartitionID = "assert-last-assigned-partition-id"
+)
+
+// A Requirement is a validation rule that must be satisfied before attempting
to
+// make and commit changes to a table. Requirements are used to ensure that the
+// table is in a valid state before making changes.
+type Requirement interface {
+ // Validate checks that the current table metadata satisfies the
requirement.
+ Validate(Metadata) error
+}
+
+// baseRequirement is a common struct that all requirements embed. It is used
to
+// identify the type of the requirement.
+type baseRequirement struct {
+ Type string `json:"type"`
+}
+
+type assertCreate struct {
+ baseRequirement
+}
+
+// AssertCreate creates a requirement that the table does not already exist.
+func AssertCreate() Requirement {
+ return &assertCreate{
+ baseRequirement: baseRequirement{Type: reqAssertCreate},
+ }
+}
+
+func (a *assertCreate) Validate(meta Metadata) error {
+ if meta != nil {
+ return fmt.Errorf("Table already exists")
+ }
+
+ return nil
+}
+
+type assertTableUuid struct {
+ baseRequirement
+ UUID uuid.UUID `json:"uuid"`
+}
+
+// AssertTableUUID creates a requirement that the table UUID matches the given
UUID.
+func AssertTableUUID(uuid uuid.UUID) Requirement {
+ return &assertTableUuid{
+ baseRequirement: baseRequirement{Type: reqAssertTableUUID},
+ UUID: uuid,
+ }
+}
+
+func (a *assertTableUuid) Validate(meta Metadata) error {
+ if meta == nil {
+ return fmt.Errorf("requirement failed: current table metadata
does not exist")
+ }
+
+ if meta.TableUUID() != a.UUID {
+ return fmt.Errorf("UUID mismatch: %s != %s", meta.TableUUID(),
a.UUID)
+ }
+
+ return nil
+}
+
+type assertRefSnapshotID struct {
+ baseRequirement
+ Ref string `json:"ref"`
+ SnapshotID *int64 `json:"snapshot-id"`
+}
+
+// AssertRefSnapshotID creates a requirement which ensures that the table
branch
+// or tag identified by the given ref must reference the given snapshot id.
+// If the id is nil, the ref must not already exist.
+func AssertRefSnapshotID(ref string, id *int64) Requirement {
+ return &assertRefSnapshotID{
+ baseRequirement: baseRequirement{Type: reqAssertRefSnapshotID},
+ Ref: ref,
+ SnapshotID: id,
+ }
+}
+
+func (a *assertRefSnapshotID) Validate(meta Metadata) error {
+ if meta == nil {
+ return fmt.Errorf("requirement failed: current table metadata
does not exist")
+ }
+
+ var r *SnapshotRef
+ for name, ref := range meta.Refs() {
+ if name == a.Ref {
+ r = &ref
+ break
+ }
+ }
+ if r == nil {
+ return fmt.Errorf("requirement failed: branch or tag %s is
missing, expected %d", a.Ref, a.SnapshotID)
+ }
+
+ if a.SnapshotID == nil {
+ return fmt.Errorf("requirement failed: %s %s was created
concurrently", r.SnapshotRefType, a.Ref)
+ }
+
+ if r.SnapshotID != *a.SnapshotID {
+ return fmt.Errorf("requirement failed: %s %s has changed:
expected id %d, found %d", r.SnapshotRefType, a.Ref, a.SnapshotID, r.SnapshotID)
+ }
+
+ return nil
+}
+
+type assertLastAssignedFieldId struct {
+ baseRequirement
+ LastAssignedFieldID int `json:"last-assigned-field-id"`
+}
+
+// AssertLastAssignedFieldID validates that the table's last assigned column ID
+// matches the given id.
+func AssertLastAssignedFieldID(id int) Requirement {
+ return &assertLastAssignedFieldId{
+ baseRequirement: baseRequirement{Type:
reqAssertLastAssignedFieldID},
+ LastAssignedFieldID: id,
+ }
+}
+
+func (a *assertLastAssignedFieldId) Validate(meta Metadata) error {
+ if meta == nil {
+ return fmt.Errorf("requirement failed: current table metadata
does not exist")
+ }
+
+ if meta.LastColumnID() != a.LastAssignedFieldID {
+ return fmt.Errorf("requirement failed: last assigned field id
has changed: expected %d, found %d", a.LastAssignedFieldID, meta.LastColumnID())
+ }
+
+ return nil
+}
+
+type assertCurrentSchemaId struct {
+ baseRequirement
+ CurrentSchemaID int `json:"current-schema-id"`
+}
+
+// AssertCurrentSchemaId creates a requirement that the table's current schema
ID
+// matches the given id.
+func AssertCurrentSchemaID(id int) Requirement {
+ return &assertCurrentSchemaId{
+ baseRequirement: baseRequirement{Type:
reqAssertCurrentSchemaID},
+ CurrentSchemaID: id,
+ }
+}
+
+func (a *assertCurrentSchemaId) Validate(meta Metadata) error {
+ if meta == nil {
+ return fmt.Errorf("requirement failed: current table metadata
does not exist")
+ }
+
+ if meta.CurrentSchema().ID != a.CurrentSchemaID {
+ return fmt.Errorf("requirement failed: current schema id has
changed: expected %d, found %d", a.CurrentSchemaID, meta.CurrentSchema().ID)
+ }
+
+ return nil
+}
+
+type assertLastAssignedPartitionId struct {
+ baseRequirement
+ LastAssignedPartitionID int `json:"last-assigned-partition-id"`
+}
+
+// AssertLastAssignedPartitionID creates a requriement that the table's last
assigned partition ID
+// matches the given id.
+func AssertLastAssignedPartitionID(id int) Requirement {
+ return &assertLastAssignedPartitionId{
+ baseRequirement: baseRequirement{Type:
reqAssertLastAssignedPartitionID},
+ LastAssignedPartitionID: id,
+ }
+}
+
+func (a *assertLastAssignedPartitionId) Validate(meta Metadata) error {
+ if meta == nil {
+ return fmt.Errorf("requirement failed: current table metadata
does not exist")
+ }
+
+ if *meta.LastPartitionSpecID() != a.LastAssignedPartitionID {
+ return fmt.Errorf("requirement failed: last assigned partition
id has changed: expected %d, found %d", a.LastAssignedPartitionID,
*meta.LastPartitionSpecID())
+ }
+
+ return nil
+}
+
+type assertDefaultSpecId struct {
+ baseRequirement
+ DefaultSpecID int `json:"default-spec-id"`
+}
+
+// AssertDefaultSpecID creates a requirement that the table's default
partition spec ID
+// matches the given id.
+func AssertDefaultSpecID(id int) Requirement {
+ return &assertDefaultSpecId{
+ baseRequirement: baseRequirement{Type: reqAssertDefaultSpecID},
+ DefaultSpecID: id,
+ }
+}
+
+func (a *assertDefaultSpecId) Validate(meta Metadata) error {
+ if meta == nil {
+ return fmt.Errorf("requirement failed: current table metadata
does not exist")
+ }
+
+ if meta.DefaultPartitionSpec() != a.DefaultSpecID {
+ return fmt.Errorf("requirement failed: default spec id has
changed: expected %d, found %d", a.DefaultSpecID, meta.DefaultPartitionSpec())
+ }
+
+ return nil
+}
+
+type assertDefaultSortOrderId struct {
+ baseRequirement
+ DefaultSortOrderID int `json:"default-sort-order-id"`
+}
+
+// AssertDefaultSortOrderID creates a requirement that the table's default
sort order ID
+// matches the given id.
+func AssertDefaultSortOrderID(id int) Requirement {
+ return &assertDefaultSortOrderId{
+ baseRequirement: baseRequirement{Type:
reqAssertDefaultSortOrderID},
+ DefaultSortOrderID: id,
+ }
+}
+
+func (a *assertDefaultSortOrderId) Validate(meta Metadata) error {
+ if meta == nil {
+ return fmt.Errorf("requirement failed: current table metadata
does not exist")
+ }
+
+ if meta.DefaultSortOrder() != a.DefaultSortOrderID {
+ return fmt.Errorf("requirement failed: default sort order id
has changed: expected %d, found %d", a.DefaultSortOrderID,
meta.DefaultSortOrder())
+ }
+
+ return nil
+}
diff --git a/table/table_test.go b/table/table_test.go
index cde94ab..f09054c 100644
--- a/table/table_test.go
+++ b/table/table_test.go
@@ -41,7 +41,7 @@ func (t *TableTestSuite) SetupSuite() {
var mockfs internal.MockFS
mockfs.Test(t.T())
mockfs.On("Open", "s3://bucket/test/location/uuid.metadata.json").
- Return(&internal.MockFile{Contents:
bytes.NewReader([]byte(ExampleTableMetadataV2))}, nil)
+ Return(&internal.MockFile{Contents:
bytes.NewReader([]byte(table.ExampleTableMetadataV2))}, nil)
defer mockfs.AssertExpectations(t.T())
tbl, err := table.NewFromLocation([]string{"foo"},
"s3://bucket/test/location/uuid.metadata.json", &mockfs)
@@ -59,7 +59,7 @@ func (t *TableTestSuite) TestNewTableFromReadFile() {
var mockfsReadFile internal.MockFSReadFile
mockfsReadFile.Test(t.T())
mockfsReadFile.On("ReadFile",
"s3://bucket/test/location/uuid.metadata.json").
- Return([]byte(ExampleTableMetadataV2), nil)
+ Return([]byte(table.ExampleTableMetadataV2), nil)
defer mockfsReadFile.AssertExpectations(t.T())
tbl2, err := table.NewFromLocation([]string{"foo"},
"s3://bucket/test/location/uuid.metadata.json", &mockfsReadFile)
diff --git a/table/updates.go b/table/updates.go
new file mode 100644
index 0000000..7b09a0f
--- /dev/null
+++ b/table/updates.go
@@ -0,0 +1,379 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+ "fmt"
+
+ "github.com/apache/iceberg-go"
+ "github.com/google/uuid"
+)
+
+const (
+ updateSpec = "add-spec"
+ updateAddSchema = "add-schema"
+ updateSortOrder = "add-sort-order"
+ updateAssignUUID = "assign-uuid"
+ updateDefaultSpec = "set-default-spec"
+ updateCurrentSchema = "set-current-schema"
+ updateDefaultSortOrder = "set-default-sort-order"
+ updateUpgradeFormat = "upgrade-format-version"
+)
+
+// Update represents a change to a table's metadata.
+type Update interface {
+ // Action returns the name of the action that the update represents.
+ Action() string
+ // Apply applies the update to the given metadata builder.
+ Apply(*MetadataBuilder) error
+}
+
+// baseUpdate contains the common fields for all updates. It is used to
identify the type
+// of the update.
+type baseUpdate struct {
+ ActionName string `json:"action"`
+}
+
+func (u *baseUpdate) Action() string {
+ return u.ActionName
+}
+
+type assignUUIDUpdate struct {
+ baseUpdate
+ UUID uuid.UUID `json:"uuid"`
+}
+
+// NewAssignUUIDUpdate creates a new update to assign a UUID to the table
metadata.
+func NewAssignUUIDUpdate(uuid uuid.UUID) Update {
+ return &assignUUIDUpdate{
+ baseUpdate: baseUpdate{ActionName: updateAssignUUID},
+ UUID: uuid,
+ }
+}
+
+func (u *assignUUIDUpdate) Apply(builder *MetadataBuilder) error {
+ _, err := builder.SetUUID(u.UUID)
+ return err
+}
+
+type upgradeFormatVersionUpdate struct {
+ baseUpdate
+ FormatVersion int `json:"format-version"`
+}
+
+// NewUpgradeFormatVersionUpdate creates a new update that upgrades the format
version
+// of the table metadata to the given formatVersion.
+func NewUpgradeFormatVersionUpdate(formatVersion int) Update {
+ return &upgradeFormatVersionUpdate{
+ baseUpdate: baseUpdate{ActionName: updateUpgradeFormat},
+ FormatVersion: formatVersion,
+ }
+}
+
+func (u *upgradeFormatVersionUpdate) Apply(builder *MetadataBuilder) error {
+ _, err := builder.SetFormatVersion(u.FormatVersion)
+ return err
+}
+
+type addSchemaUpdate struct {
+ baseUpdate
+ Schema *iceberg.Schema `json:"schema"`
+ LastColumnID int `json:"last-column-id"`
+ initial bool
+}
+
+// NewAddSchemaUpdate creates a new update that adds the given schema and last
column ID to
+// the table metadata. If the initial flag is set to true, the schema is
considered the initial
+// schema of the table, and all previously added schemas in the metadata
builder are removed.
+func NewAddSchemaUpdate(schema *iceberg.Schema, lastColumnID int, initial
bool) Update {
+ return &addSchemaUpdate{
+ baseUpdate: baseUpdate{ActionName: updateAddSchema},
+ Schema: schema,
+ LastColumnID: lastColumnID,
+ initial: initial,
+ }
+}
+
+func (u *addSchemaUpdate) Apply(builder *MetadataBuilder) error {
+ _, err := builder.AddSchema(u.Schema, u.LastColumnID, u.initial)
+ return err
+}
+
+type setCurrentSchemaUpdate struct {
+ baseUpdate
+ SchemaID int `json:"schema-id"`
+}
+
+// NewSetCurrentSchemaUpdate creates a new update that sets the current schema
of the table
+// metadata to the given schema ID.
+func NewSetCurrentSchemaUpdate(id int) Update {
+ return &setCurrentSchemaUpdate{
+ baseUpdate: baseUpdate{ActionName: updateCurrentSchema},
+ SchemaID: id,
+ }
+}
+
+func (u *setCurrentSchemaUpdate) Apply(builder *MetadataBuilder) error {
+ _, err := builder.SetCurrentSchemaID(u.SchemaID)
+ return err
+}
+
+type addPartitionSpecUpdate struct {
+ baseUpdate
+ Spec *iceberg.PartitionSpec `json:"spec"`
+ initial bool
+}
+
+// NewAddPartitionSpecUpdate creates a new update that adds the given
partition spec to the table
+// metadata. If the initial flag is set to true, the spec is considered the
initial spec of the table,
+// and all other previously added specs in the metadata builder are removed.
+func NewAddPartitionSpecUpdate(spec *iceberg.PartitionSpec, initial bool)
Update {
+ return &addPartitionSpecUpdate{
+ baseUpdate: baseUpdate{ActionName: updateSpec},
+ Spec: spec,
+ initial: initial,
+ }
+}
+
+func (u *addPartitionSpecUpdate) Apply(builder *MetadataBuilder) error {
+ _, err := builder.AddPartitionSpec(u.Spec, u.initial)
+ return err
+}
+
+type setDefaultSpecUpdate struct {
+ baseUpdate
+ SpecID int `json:"spec-id"`
+}
+
+// NewSetDefaultSpecUpdate creates a new update that sets the default
partition spec of the
+// table metadata to the given spec ID.
+func NewSetDefaultSpecUpdate(id int) Update {
+ return &setDefaultSpecUpdate{
+ baseUpdate: baseUpdate{ActionName: updateDefaultSpec},
+ SpecID: id,
+ }
+}
+
+func (u *setDefaultSpecUpdate) Apply(builder *MetadataBuilder) error {
+ _, err := builder.SetDefaultSpecID(u.SpecID)
+ return err
+}
+
+type addSortOrderUpdate struct {
+ baseUpdate
+ SortOrder *SortOrder `json:"sort-order"`
+ initial bool
+}
+
+// NewAddSortOrderUpdate creates a new update that adds the given sort order
to the table metadata.
+// If the initial flag is set to true, the sort order is considered the
initial sort order of the table,
+// and all previously added sort orders in the metadata builder are removed.
+func NewAddSortOrderUpdate(sortOrder *SortOrder, initial bool) Update {
+ return &addSortOrderUpdate{
+ baseUpdate: baseUpdate{ActionName: updateSortOrder},
+ SortOrder: sortOrder,
+ initial: initial,
+ }
+}
+
+func (u *addSortOrderUpdate) Apply(builder *MetadataBuilder) error {
+ _, err := builder.AddSortOrder(u.SortOrder, u.initial)
+ return err
+}
+
+type setDefaultSortOrderUpdate struct {
+ baseUpdate
+ SortOrderID int `json:"sort-order-id"`
+}
+
+// NewSetDefaultSortOrderUpdate creates a new update that sets the default
sort order of the table metadata
+// to the given sort order ID.
+func NewSetDefaultSortOrderUpdate(id int) Update {
+ return &setDefaultSortOrderUpdate{
+ baseUpdate: baseUpdate{ActionName: updateDefaultSortOrder},
+ SortOrderID: id,
+ }
+}
+
+func (u *setDefaultSortOrderUpdate) Apply(builder *MetadataBuilder) error {
+ _, err := builder.SetDefaultSortOrderID(u.SortOrderID)
+ return err
+}
+
+type addSnapshotUpdate struct {
+ baseUpdate
+ Snapshot *Snapshot `json:"snapshot"`
+}
+
+// NewAddSnapshotUpdate creates a new update that adds the given snapshot to
the table metadata.
+func NewAddSnapshotUpdate(snapshot *Snapshot) Update {
+ return &addSnapshotUpdate{
+ baseUpdate: baseUpdate{ActionName: "add-snapshot"},
+ Snapshot: snapshot,
+ }
+}
+
+func (u *addSnapshotUpdate) Apply(builder *MetadataBuilder) error {
+ _, err := builder.AddSnapshot(u.Snapshot)
+ return err
+}
+
+type setSnapshotRefUpdate struct {
+ baseUpdate
+ RefName string `json:"ref-name"`
+ RefType RefType `json:"type"`
+ SnapshotID int64 `json:"snapshot-id"`
+ MaxRefAgeMs int64 `json:"max-ref-age-ms,omitempty"`
+ MaxSnapshotAgeMs int64 `json:"max-snapshot-age-ms,omitempty"`
+ MinSnapshotsToKeep int `json:"min-snapshots-to-keep,omitempty"`
+}
+
+// NewSetSnapshotRefUpdate creates a new update that sets the given snapshot
reference
+// as the current snapshot of the table metadata. MaxRefAgeMs,
MaxSnapshotAgeMs,
+// and MinSnapshotsToKeep are optional, and any non-positive values are
ignored.
+func NewSetSnapshotRefUpdate(
+ name string,
+ snapshotID int64,
+ refType RefType,
+ maxRefAgeMs, maxSnapshotAgeMs int64,
+ minSnapshotsToKeep int,
+) Update {
+ return &setSnapshotRefUpdate{
+ baseUpdate: baseUpdate{ActionName: "set-snapshot-ref"},
+ RefName: name,
+ RefType: refType,
+ SnapshotID: snapshotID,
+ MaxRefAgeMs: maxRefAgeMs,
+ MaxSnapshotAgeMs: maxSnapshotAgeMs,
+ MinSnapshotsToKeep: minSnapshotsToKeep,
+ }
+}
+
+func (u *setSnapshotRefUpdate) Apply(builder *MetadataBuilder) error {
+ opts := []setSnapshotRefOption{}
+ if u.MaxRefAgeMs >= 0 {
+ opts = append(opts, WithMaxRefAgeMs(u.MaxRefAgeMs))
+ }
+ if u.MaxSnapshotAgeMs >= 0 {
+ opts = append(opts, WithMaxSnapshotAgeMs(u.MaxSnapshotAgeMs))
+ }
+ if u.MinSnapshotsToKeep >= 0 {
+ opts = append(opts,
WithMinSnapshotsToKeep(u.MinSnapshotsToKeep))
+ }
+
+ _, err := builder.SetSnapshotRef(
+ u.RefName,
+ u.SnapshotID,
+ u.RefType,
+ opts...,
+ )
+ return err
+}
+
+type setLocationUpdate struct {
+ baseUpdate
+ Location string `json:"location"`
+}
+
+// NewSetLocationUpdate creates a new update that sets the location of the
table metadata.
+func NewSetLocationUpdate(loc string) Update {
+ return &setLocationUpdate{
+ baseUpdate: baseUpdate{ActionName: "set-location"},
+ Location: loc,
+ }
+}
+
+func (u *setLocationUpdate) Apply(builder *MetadataBuilder) error {
+ _, err := builder.SetLoc(u.Location)
+ return err
+}
+
+type setPropertiesUpdate struct {
+ baseUpdate
+ Updates iceberg.Properties `json:"updates"`
+}
+
+// NewSetPropertiesUpdate creates a new update that sets the given properties
in the
+// table metadata.
+func NewSetPropertiesUpdate(updates iceberg.Properties) *setPropertiesUpdate {
+ return &setPropertiesUpdate{
+ baseUpdate: baseUpdate{ActionName: "set-properties"},
+ Updates: updates,
+ }
+}
+
+func (u *setPropertiesUpdate) Apply(builder *MetadataBuilder) error {
+ _, err := builder.SetProperties(u.Updates)
+ return err
+}
+
+type removePropertiesUpdate struct {
+ baseUpdate
+ Removals []string `json:"removals"`
+}
+
+// NewRemovePropertiesUpdate creates a new update that removes properties from
the table metadata.
+// The properties are identified by their names, and if a property with the
given name does not exist,
+// it is ignored.
+func NewRemovePropertiesUpdate(removals []string) Update {
+ return &removePropertiesUpdate{
+ baseUpdate: baseUpdate{ActionName: "remove-properties"},
+ Removals: removals,
+ }
+}
+
+func (u *removePropertiesUpdate) Apply(builder *MetadataBuilder) error {
+ _, err := builder.RemoveProperties(u.Removals)
+ return err
+}
+
+type removeSnapshotsUpdate struct {
+ baseUpdate
+ SnapshotIDs []int64 `json:"snapshot-ids"`
+}
+
+// NewRemoveSnapshotsUpdate creates a new update that removes all snapshots
from
+// the table metadata with the given snapshot IDs.
+func NewRemoveSnapshotsUpdate(ids []int64) Update {
+ return &removeSnapshotsUpdate{
+ baseUpdate: baseUpdate{ActionName: "remove-snapshots"},
+ SnapshotIDs: ids,
+ }
+}
+
+func (u *removeSnapshotsUpdate) Apply(builder *MetadataBuilder) error {
+ return fmt.Errorf("%w: remove-snapshots", iceberg.ErrNotImplemented)
+}
+
+type removeSnapshotRefUpdate struct {
+ baseUpdate
+ RefName string `json:"ref-name"`
+}
+
+// NewRemoveSnapshotRefUpdate creates a new update that removes a snapshot
reference
+// from the table metadata.
+func NewRemoveSnapshotRefUpdate(ref string) *removeSnapshotRefUpdate {
+ return &removeSnapshotRefUpdate{
+ baseUpdate: baseUpdate{ActionName: "remove-snapshot-ref"},
+ RefName: ref,
+ }
+}
+
+func (u *removeSnapshotRefUpdate) Apply(builder *MetadataBuilder) error {
+ return fmt.Errorf("%w: remove-snapshot-ref", iceberg.ErrNotImplemented)
+}