This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new b4d620a3 feat(rest): add support staged table creation in REST catalog
(#754)
b4d620a3 is described below
commit b4d620a316e29d4891304290519fb951fc8bab1d
Author: Lovro Mažgon <[email protected]>
AuthorDate: Thu Feb 26 17:27:45 2026 +0100
feat(rest): add support staged table creation in REST catalog (#754)
This PR adds support for two-phase staged table creation in a REST
catalog via the new `WithStagedUpdates` option. When staged updates are
present (e.g. `assign-uuid`, `add-snapshot`, `set-snapshot-ref`), the
REST catalog sends a create request with `stage-create=true`, then
commits via `CommitTable` with `assert-create` + all updates atomically.
There's a few minor additional fixes sneaked into this PR:
- Map HTTP status 500/502/503/504 on `CommitTable` and `UpdateTable` to
`ErrCommitStateUnknown` (matching Java's error mapping)
- Remove dead code in `CommitTable`
- Fix `make lint-install` target (missing v2)
---
Makefile | 2 +-
catalog/catalog.go | 18 +++
catalog/rest/rest.go | 85 ++++++++++++-
catalog/rest/rest_test.go | 303 ++++++++++++++++++++++++++++++++++++++++++++++
4 files changed, 401 insertions(+), 7 deletions(-)
diff --git a/Makefile b/Makefile
index 580d89d3..b5db6e79 100644
--- a/Makefile
+++ b/Makefile
@@ -26,7 +26,7 @@ lint:
golangci-lint run --timeout=10m
lint-install:
- go install
github.com/golangci/golangci-lint/cmd/golangci-lint@$(GOLANGCI_LINT_VERSION)
+ go install
github.com/golangci/golangci-lint/v2/cmd/golangci-lint@$(GOLANGCI_LINT_VERSION)
integration-setup:
docker compose -f internal/recipe/docker-compose.yml up -d
diff --git a/catalog/catalog.go b/catalog/catalog.go
index 487d9758..cc93e5f2 100644
--- a/catalog/catalog.go
+++ b/catalog/catalog.go
@@ -79,6 +79,12 @@ type CreateTableCfg struct {
commonCreateCfg
PartitionSpec *iceberg.PartitionSpec
SortOrder table.SortOrder
+ // StagedUpdates holds additional table.Update operations that cause
+ // the REST catalog to use a two-phase staged creation. Phase 1
+ // sends a minimal create with stage-create=true; phase 2 commits
+ // with an assert-create requirement and all updates atomically.
+ // Non-REST catalogs ignore this field.
+ StagedUpdates []table.Update
}
func NewCreateTableCfg() CreateTableCfg {
@@ -192,6 +198,18 @@ func WithProperties(props iceberg.Properties)
CreateTableOpt {
}
}
+// WithStagedUpdates provides additional table.Update operations that
+// cause the REST catalog to use two-phase staged creation. This is
+// useful for atomically creating a table with a custom UUID, initial
+// snapshots, or snapshot references. Use constructors from the table
+// package (e.g. table.NewAssignUUIDUpdate, table.NewAddSnapshotUpdate,
+// table.NewSetSnapshotRefUpdate).
+func WithStagedUpdates(updates ...table.Update) CreateTableOpt {
+ return func(cfg *CreateTableCfg) {
+ cfg.StagedUpdates = append(cfg.StagedUpdates, updates...)
+ }
+}
+
type CreateViewOpt func(*CreateViewCfg)
func WithViewLocation(location string) CreateViewOpt {
diff --git a/catalog/rest/rest.go b/catalog/rest/rest.go
index 1bd698e0..548efda4 100644
--- a/catalog/rest/rest.go
+++ b/catalog/rest/rest.go
@@ -721,13 +721,15 @@ func (r *Catalog) CreateTable(ctx context.Context,
identifier table.Identifier,
cfg.SortOrder = table.UnsortedSortOrder
}
+ stagedCreate := len(cfg.StagedUpdates) > 0
+
payload := createTableRequest{
Name: tbl,
Schema: schema,
Location: cfg.Location,
PartitionSpec: cfg.PartitionSpec,
WriteOrder: &cfg.SortOrder,
- StageCreate: false,
+ StageCreate: stagedCreate,
Props: cfg.Properties,
}
@@ -737,6 +739,15 @@ func (r *Catalog) CreateTable(ctx context.Context,
identifier table.Identifier,
return nil, err
}
+ if stagedCreate {
+ meta, metaLoc, err := r.commitStagedCreate(ctx, identifier,
ret.Metadata, cfg.StagedUpdates)
+ if err != nil {
+ return nil, err
+ }
+ ret.Metadata = meta
+ ret.MetadataLoc = metaLoc
+ }
+
config := maps.Clone(r.props)
maps.Copy(config, ret.Metadata.Properties())
maps.Copy(config, ret.Config)
@@ -744,6 +755,57 @@ func (r *Catalog) CreateTable(ctx context.Context,
identifier table.Identifier,
return r.tableFromResponse(ctx, identifier, ret.Metadata,
ret.MetadataLoc, config)
}
+// commitStagedCreate performs the second phase of a staged table
+// creation by committing the table with an assert-create requirement.
+// Changes are extracted from the phase 1 response metadata (which may
+// have been normalized by the server), matching the Java implementation's
+// createChanges approach. User-provided staged updates are appended
+// after server-derived changes so that user intent takes precedence.
+//
+// If phase 2 fails, the server is left with an orphaned staged table
+// (no automatic rollback).
+func (r *Catalog) commitStagedCreate(ctx context.Context, identifier
table.Identifier, meta table.Metadata, stagedUpdates []table.Update)
(table.Metadata, string, error) {
+ // Build updates from server-returned metadata, mirroring Java's
+ // RESTSessionCatalog.createChanges.
+ updates := make([]table.Update, 0, len(stagedUpdates)+10)
+ updates = append(updates,
+ table.NewAssignUUIDUpdate(meta.TableUUID()),
+ table.NewUpgradeFormatVersionUpdate(meta.Version()),
+ table.NewAddSchemaUpdate(meta.CurrentSchema()),
+ table.NewSetCurrentSchemaUpdate(-1),
+ )
+
+ spec := meta.PartitionSpec()
+ if !spec.IsUnpartitioned() {
+ updates = append(updates,
table.NewAddPartitionSpecUpdate(&spec, true))
+ } else {
+ updates = append(updates,
table.NewAddPartitionSpecUpdate(iceberg.UnpartitionedSpec, true))
+ }
+ updates = append(updates, table.NewSetDefaultSpecUpdate(-1))
+
+ order := meta.SortOrder()
+ if !order.IsUnsorted() {
+ updates = append(updates, table.NewAddSortOrderUpdate(&order))
+ } else {
+ updates = append(updates,
table.NewAddSortOrderUpdate(&table.UnsortedSortOrder))
+ }
+ updates = append(updates, table.NewSetDefaultSortOrderUpdate(-1))
+
+ if loc := meta.Location(); loc != "" {
+ updates = append(updates, table.NewSetLocationUpdate(loc))
+ }
+ if props := meta.Properties(); len(props) > 0 {
+ updates = append(updates, table.NewSetPropertiesUpdate(props))
+ }
+
+ // Append user-provided staged updates at the end.
+ updates = append(updates, stagedUpdates...)
+
+ requirements := []table.Requirement{table.AssertCreate()}
+
+ return r.CommitTable(ctx, identifier, requirements, updates)
+}
+
func (r *Catalog) CommitTable(ctx context.Context, ident table.Identifier,
requirements []table.Requirement, updates []table.Update) (table.Metadata,
string, error) {
ns, tblName, err := splitIdentForPath(ident)
if err != nil {
@@ -763,14 +825,18 @@ func (r *Catalog) CommitTable(ctx context.Context, ident
table.Identifier, requi
ret, err := doPost[payload, commitTableResponse](ctx, r.baseURI,
[]string{"namespaces", ns, "tables", tblName},
payload{Identifier: restIdentifier, Requirements: requirements,
Updates: updates}, r.cl,
- map[int]error{http.StatusNotFound: catalog.ErrNoSuchTable,
http.StatusConflict: ErrCommitFailed})
+ map[int]error{
+ http.StatusNotFound: catalog.ErrNoSuchTable,
+ http.StatusConflict: ErrCommitFailed,
+ http.StatusInternalServerError: ErrCommitStateUnknown,
+ http.StatusBadGateway: ErrCommitStateUnknown,
+ http.StatusServiceUnavailable: ErrCommitStateUnknown,
+ http.StatusGatewayTimeout: ErrCommitStateUnknown,
+ })
if err != nil {
return nil, "", err
}
- config := maps.Clone(r.props)
- maps.Copy(config, ret.Metadata.Properties())
-
return ret.Metadata, ret.MetadataLoc, nil
}
@@ -838,7 +904,14 @@ func (r *Catalog) UpdateTable(ctx context.Context, ident
table.Identifier, requi
}
ret, err := doPost[payload, commitTableResponse](ctx, r.baseURI,
[]string{"namespaces", ns, "tables", tbl},
payload{Identifier: restIdentifier, Requirements: requirements,
Updates: updates}, r.cl,
- map[int]error{http.StatusNotFound: catalog.ErrNoSuchTable,
http.StatusConflict: ErrCommitFailed})
+ map[int]error{
+ http.StatusNotFound: catalog.ErrNoSuchTable,
+ http.StatusConflict: ErrCommitFailed,
+ http.StatusInternalServerError: ErrCommitStateUnknown,
+ http.StatusBadGateway: ErrCommitStateUnknown,
+ http.StatusServiceUnavailable: ErrCommitStateUnknown,
+ http.StatusGatewayTimeout: ErrCommitStateUnknown,
+ })
if err != nil {
return nil, err
}
diff --git a/catalog/rest/rest_test.go b/catalog/rest/rest_test.go
index 67c6c0d5..4f6f7db4 100644
--- a/catalog/rest/rest_test.go
+++ b/catalog/rest/rest_test.go
@@ -2545,3 +2545,306 @@ func (r *RestTLSCatalogSuite)
TestCatalogWithCustomTransportAndTlsConfig() {
r.ErrorContains(err, "invalid catalog config with non-nil tlsConfig and
transport")
r.Nil(cat)
}
+
+// tableMetadataV1JSON returns a minimal valid V1 table metadata JSON
+// suitable for use in test server responses.
+func tableMetadataV1JSON(tableUUID string) string {
+ if tableUUID == "" {
+ tableUUID = "bf289591-dcc0-4234-ad4f-5c3eed811a29"
+ }
+
+ return fmt.Sprintf(`{
+ "format-version": 1,
+ "table-uuid": "%s",
+ "location": "s3://warehouse/db/tbl",
+ "last-updated-ms": 1657810967051,
+ "last-column-id": 3,
+ "schema": {
+ "type": "struct",
+ "schema-id": 0,
+ "fields": [
+ {"id": 1, "name": "id", "required": true,
"type": "long"},
+ {"id": 2, "name": "data", "required": false,
"type": "string"},
+ {"id": 3, "name": "ts", "required": false,
"type": "timestamp"}
+ ]
+ },
+ "current-schema-id": 0,
+ "schemas": [{
+ "type": "struct",
+ "schema-id": 0,
+ "fields": [
+ {"id": 1, "name": "id", "required": true,
"type": "long"},
+ {"id": 2, "name": "data", "required": false,
"type": "string"},
+ {"id": 3, "name": "ts", "required": false,
"type": "timestamp"}
+ ]
+ }],
+ "partition-spec": [],
+ "default-spec-id": 0,
+ "last-partition-id": 999,
+ "default-sort-order-id": 0,
+ "sort-orders": [{"order-id": 0, "fields": []}],
+ "properties": {},
+ "current-snapshot-id": -1,
+ "refs": {},
+ "snapshots": [],
+ "snapshot-log": [],
+ "metadata-log": []
+ }`, tableUUID)
+}
+
+func (r *RestCatalogSuite) TestCreateTableStaged() {
+ var createCalled bool
+ var commitCalled bool
+ var lastCreateBody map[string]any
+ var lastCommitBody map[string]any
+
+ r.mux.HandleFunc("/v1/oauth/tokens", func(w http.ResponseWriter, req
*http.Request) {
+ json.NewEncoder(w).Encode(map[string]any{
+ "access_token": TestToken, "token_type": "Bearer",
"expires_in": 3600,
+ })
+ })
+
+ // Phase 1: staged create POST
+ r.mux.HandleFunc("/v1/namespaces/db/tables", func(w
http.ResponseWriter, req *http.Request) {
+ if req.Method != http.MethodPost {
+ w.WriteHeader(http.StatusMethodNotAllowed)
+
+ return
+ }
+ createCalled = true
+ json.NewDecoder(req.Body).Decode(&lastCreateBody)
+
+ w.WriteHeader(http.StatusOK)
+ w.Write([]byte(`{
+ "metadata-location":
"s3://warehouse/db/tbl/metadata/staged.json",
+ "metadata": ` + tableMetadataV1JSON("") + `
+ }`))
+ })
+
+ // Phase 2: commit POST — return metadata with the user's UUID
+ // to simulate the server applying the assign-uuid update.
+ testUUID := uuid.MustParse("12345678-1234-1234-1234-123456789abc")
+ r.mux.HandleFunc("/v1/namespaces/db/tables/test_table", func(w
http.ResponseWriter, req *http.Request) {
+ if req.Method != http.MethodPost {
+ w.WriteHeader(http.StatusMethodNotAllowed)
+
+ return
+ }
+ commitCalled = true
+ json.NewDecoder(req.Body).Decode(&lastCommitBody)
+
+ w.WriteHeader(http.StatusOK)
+ w.Write([]byte(`{
+ "metadata-location":
"s3://warehouse/db/tbl/metadata/v1.json",
+ "metadata": ` + tableMetadataV1JSON(testUUID.String())
+ `
+ }`))
+ })
+
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
+ rest.WithCredential(TestCreds))
+ r.Require().NoError(err)
+
+ schema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "data", Type:
iceberg.PrimitiveTypes.String, Required: false},
+ )
+
+ tbl, err := cat.CreateTable(context.Background(),
+ table.Identifier{"db", "test_table"},
+ schema,
+ catalog.WithStagedUpdates(
+ table.NewAssignUUIDUpdate(testUUID),
+ ),
+ )
+ r.Require().NoError(err)
+ r.NotNil(tbl)
+
+ // Verify phase 1: create request had stage-create=true
+ r.True(createCalled, "create endpoint should be called")
+ stageCreate, _ := lastCreateBody["stage-create"].(bool)
+ r.True(stageCreate, "stage-create should be true")
+
+ // Verify payload
+ r.Equal("test_table", lastCreateBody["name"])
+ r.NotNil(lastCreateBody["schema"], "staged create should include
schema")
+ r.NotNil(lastCreateBody["write-order"], "staged create should include
write-order")
+
+ // Verify phase 2: commit request was sent
+ r.True(commitCalled, "commit endpoint should be called")
+
+ // Verify commit has assert-create requirement
+ requirements, ok := lastCommitBody["requirements"].([]any)
+ r.Require().True(ok, "commit should have requirements")
+ r.Require().Len(requirements, 1)
+ req0 := requirements[0].(map[string]any)
+ r.Equal("assert-create", req0["type"])
+
+ // Verify commit updates: server-derived first, user staged last.
+ updates, ok := lastCommitBody["updates"].([]any)
+ r.Require().True(ok, "commit should have updates")
+
+ actionNames := make([]string, len(updates))
+ for i, u := range updates {
+ actionNames[i] = u.(map[string]any)["action"].(string)
+ }
+
+ // Server-derived updates should come first
+ r.Contains(actionNames, "upgrade-format-version")
+ r.Contains(actionNames, "add-schema")
+ r.Contains(actionNames, "set-current-schema")
+ r.Contains(actionNames, "add-spec")
+ r.Contains(actionNames, "set-default-spec")
+ r.Contains(actionNames, "add-sort-order")
+ r.Contains(actionNames, "set-default-sort-order")
+ r.Contains(actionNames, "set-location")
+
+ // User staged update (assign-uuid with our UUID) should be last,
+ // so it takes precedence over the server-derived assign-uuid.
+ lastUpdate := updates[len(updates)-1].(map[string]any)
+ r.Equal("assign-uuid", lastUpdate["action"])
+ r.Equal(testUUID.String(), lastUpdate["uuid"])
+
+ // Verify exactly two assign-uuid entries (server + user) and
+ // user's comes last so it wins.
+ var assignUUIDCount int
+ for _, name := range actionNames {
+ if name == "assign-uuid" {
+ assignUUIDCount++
+ }
+ }
+ r.Equal(2, assignUUIDCount, "should have server and user assign-uuid")
+
+ // Verify the returned table has the user's UUID, not the server's.
+ r.Equal(testUUID, tbl.Metadata().TableUUID())
+}
+
+func (r *RestCatalogSuite) TestCreateTableNotStaged() {
+ var createBody map[string]any
+ var commitCalled bool
+
+ r.mux.HandleFunc("/v1/oauth/tokens", func(w http.ResponseWriter, req
*http.Request) {
+ json.NewEncoder(w).Encode(map[string]any{
+ "access_token": TestToken, "token_type": "Bearer",
"expires_in": 3600,
+ })
+ })
+
+ r.mux.HandleFunc("/v1/namespaces/db/tables", func(w
http.ResponseWriter, req *http.Request) {
+ json.NewDecoder(req.Body).Decode(&createBody)
+ w.WriteHeader(http.StatusOK)
+ w.Write([]byte(`{
+ "metadata-location":
"s3://warehouse/db/tbl/metadata/v1.json",
+ "metadata": ` + tableMetadataV1JSON("") + `
+ }`))
+ })
+
+ r.mux.HandleFunc("/v1/namespaces/db/tables/test_table", func(w
http.ResponseWriter, req *http.Request) {
+ commitCalled = true
+ w.WriteHeader(http.StatusOK)
+ })
+
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
+ rest.WithCredential(TestCreds))
+ r.Require().NoError(err)
+
+ schema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ )
+
+ _, err = cat.CreateTable(context.Background(),
+ table.Identifier{"db", "test_table"},
+ schema,
+ )
+ r.Require().NoError(err)
+
+ stageCreate, _ := createBody["stage-create"].(bool)
+ r.False(stageCreate, "stage-create should be false without staged
updates")
+ r.False(commitCalled, "commit should not be called for non-staged
create")
+}
+
+func (r *RestCatalogSuite) TestCommitTableErrCommitStateUnknown() {
+ var statusCode int
+
+ r.mux.HandleFunc("/v1/oauth/tokens", func(w http.ResponseWriter, req
*http.Request) {
+ json.NewEncoder(w).Encode(map[string]any{
+ "access_token": TestToken, "token_type": "Bearer",
"expires_in": 3600,
+ })
+ })
+
+ r.mux.HandleFunc("/v1/namespaces/db/tables/tbl", func(w
http.ResponseWriter, req *http.Request) {
+ w.WriteHeader(statusCode)
+ json.NewEncoder(w).Encode(map[string]any{
+ "error": map[string]any{
+ "message": http.StatusText(statusCode),
+ "type": "ServerException",
+ "code": statusCode,
+ },
+ })
+ })
+
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
+ rest.WithCredential(TestCreds))
+ r.Require().NoError(err)
+
+ for _, code := range []int{
+ http.StatusInternalServerError,
+ http.StatusBadGateway,
+ http.StatusServiceUnavailable,
+ http.StatusGatewayTimeout,
+ } {
+ statusCode = code
+ r.Run(strconv.Itoa(code), func() {
+ _, _, err := cat.CommitTable(context.Background(),
+ table.Identifier{"db", "tbl"},
+ []table.Requirement{},
+ []table.Update{},
+ )
+ r.Require().Error(err)
+ r.ErrorIs(err, rest.ErrCommitStateUnknown,
+ "%d should return ErrCommitStateUnknown, got:
%v", code, err)
+ })
+ }
+}
+
+func (r *RestCatalogSuite) TestUpdateTableErrCommitStateUnknown() {
+ var statusCode int
+
+ r.mux.HandleFunc("/v1/oauth/tokens", func(w http.ResponseWriter, req
*http.Request) {
+ json.NewEncoder(w).Encode(map[string]any{
+ "access_token": TestToken, "token_type": "Bearer",
"expires_in": 3600,
+ })
+ })
+
+ r.mux.HandleFunc("/v1/namespaces/db/tables/tbl", func(w
http.ResponseWriter, req *http.Request) {
+ w.WriteHeader(statusCode)
+ json.NewEncoder(w).Encode(map[string]any{
+ "error": map[string]any{
+ "message": http.StatusText(statusCode),
+ "type": "ServerException",
+ "code": statusCode,
+ },
+ })
+ })
+
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
+ rest.WithCredential(TestCreds))
+ r.Require().NoError(err)
+
+ for _, code := range []int{
+ http.StatusInternalServerError,
+ http.StatusBadGateway,
+ http.StatusServiceUnavailable,
+ http.StatusGatewayTimeout,
+ } {
+ statusCode = code
+ r.Run(strconv.Itoa(code), func() {
+ _, err := cat.UpdateTable(context.Background(),
+ table.Identifier{"db", "tbl"},
+ []table.Requirement{},
+ []table.Update{},
+ )
+ r.Require().Error(err)
+ r.ErrorIs(err, rest.ErrCommitStateUnknown,
+ "%d should return ErrCommitStateUnknown, got:
%v", code, err)
+ })
+ }
+}