This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new b4d620a3 feat(rest): add support staged table creation in REST catalog 
(#754)
b4d620a3 is described below

commit b4d620a316e29d4891304290519fb951fc8bab1d
Author: Lovro Mažgon <[email protected]>
AuthorDate: Thu Feb 26 17:27:45 2026 +0100

    feat(rest): add support staged table creation in REST catalog (#754)
    
    This PR adds support for two-phase staged table creation in a REST
    catalog via the new `WithStagedUpdates` option. When staged updates are
    present (e.g. `assign-uuid`, `add-snapshot`, `set-snapshot-ref`), the
    REST catalog sends a create request with `stage-create=true`, then
    commits via `CommitTable` with `assert-create` + all updates atomically.
    
    There's a few minor additional fixes sneaked into this PR:
    - Map HTTP status 500/502/503/504 on `CommitTable` and `UpdateTable` to
    `ErrCommitStateUnknown` (matching Java's error mapping)
    - Remove dead code in `CommitTable`
    - Fix `make lint-install` target (missing v2)
---
 Makefile                  |   2 +-
 catalog/catalog.go        |  18 +++
 catalog/rest/rest.go      |  85 ++++++++++++-
 catalog/rest/rest_test.go | 303 ++++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 401 insertions(+), 7 deletions(-)

diff --git a/Makefile b/Makefile
index 580d89d3..b5db6e79 100644
--- a/Makefile
+++ b/Makefile
@@ -26,7 +26,7 @@ lint:
        golangci-lint run --timeout=10m
 
 lint-install:
-       go install 
github.com/golangci/golangci-lint/cmd/golangci-lint@$(GOLANGCI_LINT_VERSION)
+       go install 
github.com/golangci/golangci-lint/v2/cmd/golangci-lint@$(GOLANGCI_LINT_VERSION)
 
 integration-setup:
        docker compose -f internal/recipe/docker-compose.yml up -d
diff --git a/catalog/catalog.go b/catalog/catalog.go
index 487d9758..cc93e5f2 100644
--- a/catalog/catalog.go
+++ b/catalog/catalog.go
@@ -79,6 +79,12 @@ type CreateTableCfg struct {
        commonCreateCfg
        PartitionSpec *iceberg.PartitionSpec
        SortOrder     table.SortOrder
+       // StagedUpdates holds additional table.Update operations that cause
+       // the REST catalog to use a two-phase staged creation. Phase 1
+       // sends a minimal create with stage-create=true; phase 2 commits
+       // with an assert-create requirement and all updates atomically.
+       // Non-REST catalogs ignore this field.
+       StagedUpdates []table.Update
 }
 
 func NewCreateTableCfg() CreateTableCfg {
@@ -192,6 +198,18 @@ func WithProperties(props iceberg.Properties) 
CreateTableOpt {
        }
 }
 
+// WithStagedUpdates provides additional table.Update operations that
+// cause the REST catalog to use two-phase staged creation. This is
+// useful for atomically creating a table with a custom UUID, initial
+// snapshots, or snapshot references. Use constructors from the table
+// package (e.g. table.NewAssignUUIDUpdate, table.NewAddSnapshotUpdate,
+// table.NewSetSnapshotRefUpdate).
+func WithStagedUpdates(updates ...table.Update) CreateTableOpt {
+       return func(cfg *CreateTableCfg) {
+               cfg.StagedUpdates = append(cfg.StagedUpdates, updates...)
+       }
+}
+
 type CreateViewOpt func(*CreateViewCfg)
 
 func WithViewLocation(location string) CreateViewOpt {
diff --git a/catalog/rest/rest.go b/catalog/rest/rest.go
index 1bd698e0..548efda4 100644
--- a/catalog/rest/rest.go
+++ b/catalog/rest/rest.go
@@ -721,13 +721,15 @@ func (r *Catalog) CreateTable(ctx context.Context, 
identifier table.Identifier,
                cfg.SortOrder = table.UnsortedSortOrder
        }
 
+       stagedCreate := len(cfg.StagedUpdates) > 0
+
        payload := createTableRequest{
                Name:          tbl,
                Schema:        schema,
                Location:      cfg.Location,
                PartitionSpec: cfg.PartitionSpec,
                WriteOrder:    &cfg.SortOrder,
-               StageCreate:   false,
+               StageCreate:   stagedCreate,
                Props:         cfg.Properties,
        }
 
@@ -737,6 +739,15 @@ func (r *Catalog) CreateTable(ctx context.Context, 
identifier table.Identifier,
                return nil, err
        }
 
+       if stagedCreate {
+               meta, metaLoc, err := r.commitStagedCreate(ctx, identifier, 
ret.Metadata, cfg.StagedUpdates)
+               if err != nil {
+                       return nil, err
+               }
+               ret.Metadata = meta
+               ret.MetadataLoc = metaLoc
+       }
+
        config := maps.Clone(r.props)
        maps.Copy(config, ret.Metadata.Properties())
        maps.Copy(config, ret.Config)
@@ -744,6 +755,57 @@ func (r *Catalog) CreateTable(ctx context.Context, 
identifier table.Identifier,
        return r.tableFromResponse(ctx, identifier, ret.Metadata, 
ret.MetadataLoc, config)
 }
 
+// commitStagedCreate performs the second phase of a staged table
+// creation by committing the table with an assert-create requirement.
+// Changes are extracted from the phase 1 response metadata (which may
+// have been normalized by the server), matching the Java implementation's
+// createChanges approach. User-provided staged updates are appended
+// after server-derived changes so that user intent takes precedence.
+//
+// If phase 2 fails, the server is left with an orphaned staged table
+// (no automatic rollback).
+func (r *Catalog) commitStagedCreate(ctx context.Context, identifier 
table.Identifier, meta table.Metadata, stagedUpdates []table.Update) 
(table.Metadata, string, error) {
+       // Build updates from server-returned metadata, mirroring Java's
+       // RESTSessionCatalog.createChanges.
+       updates := make([]table.Update, 0, len(stagedUpdates)+10)
+       updates = append(updates,
+               table.NewAssignUUIDUpdate(meta.TableUUID()),
+               table.NewUpgradeFormatVersionUpdate(meta.Version()),
+               table.NewAddSchemaUpdate(meta.CurrentSchema()),
+               table.NewSetCurrentSchemaUpdate(-1),
+       )
+
+       spec := meta.PartitionSpec()
+       if !spec.IsUnpartitioned() {
+               updates = append(updates, 
table.NewAddPartitionSpecUpdate(&spec, true))
+       } else {
+               updates = append(updates, 
table.NewAddPartitionSpecUpdate(iceberg.UnpartitionedSpec, true))
+       }
+       updates = append(updates, table.NewSetDefaultSpecUpdate(-1))
+
+       order := meta.SortOrder()
+       if !order.IsUnsorted() {
+               updates = append(updates, table.NewAddSortOrderUpdate(&order))
+       } else {
+               updates = append(updates, 
table.NewAddSortOrderUpdate(&table.UnsortedSortOrder))
+       }
+       updates = append(updates, table.NewSetDefaultSortOrderUpdate(-1))
+
+       if loc := meta.Location(); loc != "" {
+               updates = append(updates, table.NewSetLocationUpdate(loc))
+       }
+       if props := meta.Properties(); len(props) > 0 {
+               updates = append(updates, table.NewSetPropertiesUpdate(props))
+       }
+
+       // Append user-provided staged updates at the end.
+       updates = append(updates, stagedUpdates...)
+
+       requirements := []table.Requirement{table.AssertCreate()}
+
+       return r.CommitTable(ctx, identifier, requirements, updates)
+}
+
 func (r *Catalog) CommitTable(ctx context.Context, ident table.Identifier, 
requirements []table.Requirement, updates []table.Update) (table.Metadata, 
string, error) {
        ns, tblName, err := splitIdentForPath(ident)
        if err != nil {
@@ -763,14 +825,18 @@ func (r *Catalog) CommitTable(ctx context.Context, ident 
table.Identifier, requi
 
        ret, err := doPost[payload, commitTableResponse](ctx, r.baseURI, 
[]string{"namespaces", ns, "tables", tblName},
                payload{Identifier: restIdentifier, Requirements: requirements, 
Updates: updates}, r.cl,
-               map[int]error{http.StatusNotFound: catalog.ErrNoSuchTable, 
http.StatusConflict: ErrCommitFailed})
+               map[int]error{
+                       http.StatusNotFound:            catalog.ErrNoSuchTable,
+                       http.StatusConflict:            ErrCommitFailed,
+                       http.StatusInternalServerError: ErrCommitStateUnknown,
+                       http.StatusBadGateway:          ErrCommitStateUnknown,
+                       http.StatusServiceUnavailable:  ErrCommitStateUnknown,
+                       http.StatusGatewayTimeout:      ErrCommitStateUnknown,
+               })
        if err != nil {
                return nil, "", err
        }
 
-       config := maps.Clone(r.props)
-       maps.Copy(config, ret.Metadata.Properties())
-
        return ret.Metadata, ret.MetadataLoc, nil
 }
 
@@ -838,7 +904,14 @@ func (r *Catalog) UpdateTable(ctx context.Context, ident 
table.Identifier, requi
        }
        ret, err := doPost[payload, commitTableResponse](ctx, r.baseURI, 
[]string{"namespaces", ns, "tables", tbl},
                payload{Identifier: restIdentifier, Requirements: requirements, 
Updates: updates}, r.cl,
-               map[int]error{http.StatusNotFound: catalog.ErrNoSuchTable, 
http.StatusConflict: ErrCommitFailed})
+               map[int]error{
+                       http.StatusNotFound:            catalog.ErrNoSuchTable,
+                       http.StatusConflict:            ErrCommitFailed,
+                       http.StatusInternalServerError: ErrCommitStateUnknown,
+                       http.StatusBadGateway:          ErrCommitStateUnknown,
+                       http.StatusServiceUnavailable:  ErrCommitStateUnknown,
+                       http.StatusGatewayTimeout:      ErrCommitStateUnknown,
+               })
        if err != nil {
                return nil, err
        }
diff --git a/catalog/rest/rest_test.go b/catalog/rest/rest_test.go
index 67c6c0d5..4f6f7db4 100644
--- a/catalog/rest/rest_test.go
+++ b/catalog/rest/rest_test.go
@@ -2545,3 +2545,306 @@ func (r *RestTLSCatalogSuite) 
TestCatalogWithCustomTransportAndTlsConfig() {
        r.ErrorContains(err, "invalid catalog config with non-nil tlsConfig and 
transport")
        r.Nil(cat)
 }
+
+// tableMetadataV1JSON returns a minimal valid V1 table metadata JSON
+// suitable for use in test server responses.
+func tableMetadataV1JSON(tableUUID string) string {
+       if tableUUID == "" {
+               tableUUID = "bf289591-dcc0-4234-ad4f-5c3eed811a29"
+       }
+
+       return fmt.Sprintf(`{
+               "format-version": 1,
+               "table-uuid": "%s",
+               "location": "s3://warehouse/db/tbl",
+               "last-updated-ms": 1657810967051,
+               "last-column-id": 3,
+               "schema": {
+                       "type": "struct",
+                       "schema-id": 0,
+                       "fields": [
+                               {"id": 1, "name": "id", "required": true, 
"type": "long"},
+                               {"id": 2, "name": "data", "required": false, 
"type": "string"},
+                               {"id": 3, "name": "ts", "required": false, 
"type": "timestamp"}
+                       ]
+               },
+               "current-schema-id": 0,
+               "schemas": [{
+                       "type": "struct",
+                       "schema-id": 0,
+                       "fields": [
+                               {"id": 1, "name": "id", "required": true, 
"type": "long"},
+                               {"id": 2, "name": "data", "required": false, 
"type": "string"},
+                               {"id": 3, "name": "ts", "required": false, 
"type": "timestamp"}
+                       ]
+               }],
+               "partition-spec": [],
+               "default-spec-id": 0,
+               "last-partition-id": 999,
+               "default-sort-order-id": 0,
+               "sort-orders": [{"order-id": 0, "fields": []}],
+               "properties": {},
+               "current-snapshot-id": -1,
+               "refs": {},
+               "snapshots": [],
+               "snapshot-log": [],
+               "metadata-log": []
+       }`, tableUUID)
+}
+
+func (r *RestCatalogSuite) TestCreateTableStaged() {
+       var createCalled bool
+       var commitCalled bool
+       var lastCreateBody map[string]any
+       var lastCommitBody map[string]any
+
+       r.mux.HandleFunc("/v1/oauth/tokens", func(w http.ResponseWriter, req 
*http.Request) {
+               json.NewEncoder(w).Encode(map[string]any{
+                       "access_token": TestToken, "token_type": "Bearer", 
"expires_in": 3600,
+               })
+       })
+
+       // Phase 1: staged create POST
+       r.mux.HandleFunc("/v1/namespaces/db/tables", func(w 
http.ResponseWriter, req *http.Request) {
+               if req.Method != http.MethodPost {
+                       w.WriteHeader(http.StatusMethodNotAllowed)
+
+                       return
+               }
+               createCalled = true
+               json.NewDecoder(req.Body).Decode(&lastCreateBody)
+
+               w.WriteHeader(http.StatusOK)
+               w.Write([]byte(`{
+                       "metadata-location": 
"s3://warehouse/db/tbl/metadata/staged.json",
+                       "metadata": ` + tableMetadataV1JSON("") + `
+               }`))
+       })
+
+       // Phase 2: commit POST — return metadata with the user's UUID
+       // to simulate the server applying the assign-uuid update.
+       testUUID := uuid.MustParse("12345678-1234-1234-1234-123456789abc")
+       r.mux.HandleFunc("/v1/namespaces/db/tables/test_table", func(w 
http.ResponseWriter, req *http.Request) {
+               if req.Method != http.MethodPost {
+                       w.WriteHeader(http.StatusMethodNotAllowed)
+
+                       return
+               }
+               commitCalled = true
+               json.NewDecoder(req.Body).Decode(&lastCommitBody)
+
+               w.WriteHeader(http.StatusOK)
+               w.Write([]byte(`{
+                       "metadata-location": 
"s3://warehouse/db/tbl/metadata/v1.json",
+                       "metadata": ` + tableMetadataV1JSON(testUUID.String()) 
+ `
+               }`))
+       })
+
+       cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
+               rest.WithCredential(TestCreds))
+       r.Require().NoError(err)
+
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "data", Type: 
iceberg.PrimitiveTypes.String, Required: false},
+       )
+
+       tbl, err := cat.CreateTable(context.Background(),
+               table.Identifier{"db", "test_table"},
+               schema,
+               catalog.WithStagedUpdates(
+                       table.NewAssignUUIDUpdate(testUUID),
+               ),
+       )
+       r.Require().NoError(err)
+       r.NotNil(tbl)
+
+       // Verify phase 1: create request had stage-create=true
+       r.True(createCalled, "create endpoint should be called")
+       stageCreate, _ := lastCreateBody["stage-create"].(bool)
+       r.True(stageCreate, "stage-create should be true")
+
+       // Verify payload
+       r.Equal("test_table", lastCreateBody["name"])
+       r.NotNil(lastCreateBody["schema"], "staged create should include 
schema")
+       r.NotNil(lastCreateBody["write-order"], "staged create should include 
write-order")
+
+       // Verify phase 2: commit request was sent
+       r.True(commitCalled, "commit endpoint should be called")
+
+       // Verify commit has assert-create requirement
+       requirements, ok := lastCommitBody["requirements"].([]any)
+       r.Require().True(ok, "commit should have requirements")
+       r.Require().Len(requirements, 1)
+       req0 := requirements[0].(map[string]any)
+       r.Equal("assert-create", req0["type"])
+
+       // Verify commit updates: server-derived first, user staged last.
+       updates, ok := lastCommitBody["updates"].([]any)
+       r.Require().True(ok, "commit should have updates")
+
+       actionNames := make([]string, len(updates))
+       for i, u := range updates {
+               actionNames[i] = u.(map[string]any)["action"].(string)
+       }
+
+       // Server-derived updates should come first
+       r.Contains(actionNames, "upgrade-format-version")
+       r.Contains(actionNames, "add-schema")
+       r.Contains(actionNames, "set-current-schema")
+       r.Contains(actionNames, "add-spec")
+       r.Contains(actionNames, "set-default-spec")
+       r.Contains(actionNames, "add-sort-order")
+       r.Contains(actionNames, "set-default-sort-order")
+       r.Contains(actionNames, "set-location")
+
+       // User staged update (assign-uuid with our UUID) should be last,
+       // so it takes precedence over the server-derived assign-uuid.
+       lastUpdate := updates[len(updates)-1].(map[string]any)
+       r.Equal("assign-uuid", lastUpdate["action"])
+       r.Equal(testUUID.String(), lastUpdate["uuid"])
+
+       // Verify exactly two assign-uuid entries (server + user) and
+       // user's comes last so it wins.
+       var assignUUIDCount int
+       for _, name := range actionNames {
+               if name == "assign-uuid" {
+                       assignUUIDCount++
+               }
+       }
+       r.Equal(2, assignUUIDCount, "should have server and user assign-uuid")
+
+       // Verify the returned table has the user's UUID, not the server's.
+       r.Equal(testUUID, tbl.Metadata().TableUUID())
+}
+
+func (r *RestCatalogSuite) TestCreateTableNotStaged() {
+       var createBody map[string]any
+       var commitCalled bool
+
+       r.mux.HandleFunc("/v1/oauth/tokens", func(w http.ResponseWriter, req 
*http.Request) {
+               json.NewEncoder(w).Encode(map[string]any{
+                       "access_token": TestToken, "token_type": "Bearer", 
"expires_in": 3600,
+               })
+       })
+
+       r.mux.HandleFunc("/v1/namespaces/db/tables", func(w 
http.ResponseWriter, req *http.Request) {
+               json.NewDecoder(req.Body).Decode(&createBody)
+               w.WriteHeader(http.StatusOK)
+               w.Write([]byte(`{
+                       "metadata-location": 
"s3://warehouse/db/tbl/metadata/v1.json",
+                       "metadata": ` + tableMetadataV1JSON("") + `
+               }`))
+       })
+
+       r.mux.HandleFunc("/v1/namespaces/db/tables/test_table", func(w 
http.ResponseWriter, req *http.Request) {
+               commitCalled = true
+               w.WriteHeader(http.StatusOK)
+       })
+
+       cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
+               rest.WithCredential(TestCreds))
+       r.Require().NoError(err)
+
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+       )
+
+       _, err = cat.CreateTable(context.Background(),
+               table.Identifier{"db", "test_table"},
+               schema,
+       )
+       r.Require().NoError(err)
+
+       stageCreate, _ := createBody["stage-create"].(bool)
+       r.False(stageCreate, "stage-create should be false without staged 
updates")
+       r.False(commitCalled, "commit should not be called for non-staged 
create")
+}
+
+func (r *RestCatalogSuite) TestCommitTableErrCommitStateUnknown() {
+       var statusCode int
+
+       r.mux.HandleFunc("/v1/oauth/tokens", func(w http.ResponseWriter, req 
*http.Request) {
+               json.NewEncoder(w).Encode(map[string]any{
+                       "access_token": TestToken, "token_type": "Bearer", 
"expires_in": 3600,
+               })
+       })
+
+       r.mux.HandleFunc("/v1/namespaces/db/tables/tbl", func(w 
http.ResponseWriter, req *http.Request) {
+               w.WriteHeader(statusCode)
+               json.NewEncoder(w).Encode(map[string]any{
+                       "error": map[string]any{
+                               "message": http.StatusText(statusCode),
+                               "type":    "ServerException",
+                               "code":    statusCode,
+                       },
+               })
+       })
+
+       cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
+               rest.WithCredential(TestCreds))
+       r.Require().NoError(err)
+
+       for _, code := range []int{
+               http.StatusInternalServerError,
+               http.StatusBadGateway,
+               http.StatusServiceUnavailable,
+               http.StatusGatewayTimeout,
+       } {
+               statusCode = code
+               r.Run(strconv.Itoa(code), func() {
+                       _, _, err := cat.CommitTable(context.Background(),
+                               table.Identifier{"db", "tbl"},
+                               []table.Requirement{},
+                               []table.Update{},
+                       )
+                       r.Require().Error(err)
+                       r.ErrorIs(err, rest.ErrCommitStateUnknown,
+                               "%d should return ErrCommitStateUnknown, got: 
%v", code, err)
+               })
+       }
+}
+
+func (r *RestCatalogSuite) TestUpdateTableErrCommitStateUnknown() {
+       var statusCode int
+
+       r.mux.HandleFunc("/v1/oauth/tokens", func(w http.ResponseWriter, req 
*http.Request) {
+               json.NewEncoder(w).Encode(map[string]any{
+                       "access_token": TestToken, "token_type": "Bearer", 
"expires_in": 3600,
+               })
+       })
+
+       r.mux.HandleFunc("/v1/namespaces/db/tables/tbl", func(w 
http.ResponseWriter, req *http.Request) {
+               w.WriteHeader(statusCode)
+               json.NewEncoder(w).Encode(map[string]any{
+                       "error": map[string]any{
+                               "message": http.StatusText(statusCode),
+                               "type":    "ServerException",
+                               "code":    statusCode,
+                       },
+               })
+       })
+
+       cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
+               rest.WithCredential(TestCreds))
+       r.Require().NoError(err)
+
+       for _, code := range []int{
+               http.StatusInternalServerError,
+               http.StatusBadGateway,
+               http.StatusServiceUnavailable,
+               http.StatusGatewayTimeout,
+       } {
+               statusCode = code
+               r.Run(strconv.Itoa(code), func() {
+                       _, err := cat.UpdateTable(context.Background(),
+                               table.Identifier{"db", "tbl"},
+                               []table.Requirement{},
+                               []table.Update{},
+                       )
+                       r.Require().Error(err)
+                       r.ErrorIs(err, rest.ErrCommitStateUnknown,
+                               "%d should return ErrCommitStateUnknown, got: 
%v", code, err)
+               })
+       }
+}

Reply via email to