This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 6fab25ce feat(catalog): add TransactionalCatalog interface and REST 
multi-table commit (#787)
6fab25ce is described below

commit 6fab25ce5797164b16022ce1ebc0d97d4311ed2d
Author: Andrei Tserakhau <[email protected]>
AuthorDate: Wed Mar 25 00:25:31 2026 +0100

    feat(catalog): add TransactionalCatalog interface and REST multi-table 
commit (#787)
    
    Add support for atomic multi-table commits via the Iceberg REST spec's
    POST /v1/transactions/commit endpoint.
    
    - Add TableCommit type bundling identifier, requirements, and updates
    - Add TransactionalCatalog optional interface with CommitTransaction
    - Implement CommitTransaction on REST catalog
    - Add Transaction.TableCommit() to extract pending changes without
    committing
    - Add Transaction.MarkCommitted() for use after multi-table commit
    
    Closes #784
---
 catalog/catalog.go                      |  18 ++++
 catalog/multi_table_transaction.go      | 137 ++++++++++++++++++++++++++++
 catalog/multi_table_transaction_test.go |  99 ++++++++++++++++++++
 catalog/rest/rest.go                    |  68 +++++++++++++-
 catalog/rest/rest_integration_test.go   |  95 +++++++++++++++++++
 catalog/rest/rest_test.go               | 157 ++++++++++++++++++++++++++++++++
 table/commit.go                         |  91 ++++++++++++++++++
 table/commit_test.go                    | 138 ++++++++++++++++++++++++++++
 8 files changed, 802 insertions(+), 1 deletion(-)

diff --git a/catalog/catalog.go b/catalog/catalog.go
index cc93e5f2..bfa57350 100644
--- a/catalog/catalog.go
+++ b/catalog/catalog.go
@@ -60,6 +60,8 @@ var (
        ErrNamespaceNotEmpty      = errors.New("namespace is not empty")
        ErrNoSuchView             = errors.New("view does not exist")
        ErrViewAlreadyExists      = errors.New("view already exists")
+       ErrEmptyCommitList        = errors.New("commit list must not be empty")
+       ErrMissingIdentifier      = errors.New("every table commit must have a 
valid identifier")
 )
 
 type PropertiesUpdateSummary struct {
@@ -148,6 +150,22 @@ type Catalog interface {
                removals []string, updates iceberg.Properties) 
(PropertiesUpdateSummary, error)
 }
 
+// TransactionalCatalog is an optional interface implemented by catalogs
+// that support atomic multi-table commits. Callers should check for this
+// capability via a type assertion:
+//
+//     if tc, ok := cat.(catalog.TransactionalCatalog); ok {
+//         err := tc.CommitTransaction(ctx, commits)
+//     }
+//
+// The endpoint is all-or-nothing: either all table changes are applied
+// atomically, or none are. On success the method returns nil; the caller
+// must LoadTable individually to obtain updated metadata because the
+// server returns 204 No Content.
+type TransactionalCatalog interface {
+       CommitTransaction(ctx context.Context, commits []table.TableCommit) 
error
+}
+
 func ToIdentifier(ident ...string) table.Identifier {
        if len(ident) == 1 {
                if ident[0] == "" {
diff --git a/catalog/multi_table_transaction.go 
b/catalog/multi_table_transaction.go
new file mode 100644
index 00000000..d391fc9d
--- /dev/null
+++ b/catalog/multi_table_transaction.go
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package catalog
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "slices"
+       "strings"
+
+       "github.com/apache/iceberg-go/table"
+)
+
+// MultiTableTransaction collects changes across multiple tables and
+// commits them atomically via [TransactionalCatalog.CommitTransaction].
+//
+// A MultiTableTransaction must not be used concurrently from multiple
+// goroutines.
+//
+// Usage:
+//
+//     mtx, err := catalog.NewMultiTableTransaction(cat)
+//     // ... err check ...
+//
+//     tx1 := tbl1.NewTransaction()
+//     tx1.SetProperties(map[string]string{"key": "val"})
+//     mtx.AddTransaction(tx1)
+//
+//     tx2 := tbl2.NewTransaction()
+//     // ... build changes on tx2 ...
+//     mtx.AddTransaction(tx2)
+//
+//     err = mtx.Commit(ctx)
+type MultiTableTransaction struct {
+       cat       TransactionalCatalog
+       txns      []*table.Transaction
+       idents    []string
+       committed bool
+}
+
+// NewMultiTableTransaction creates a new multi-table transaction backed
+// by the given catalog. Returns an error if the catalog does not
+// implement [TransactionalCatalog].
+func NewMultiTableTransaction(cat Catalog) (*MultiTableTransaction, error) {
+       tc, ok := cat.(TransactionalCatalog)
+       if !ok {
+               return nil, errors.New("catalog does not support multi-table 
transactions")
+       }
+
+       return &MultiTableTransaction{cat: tc}, nil
+}
+
+// AddTransaction adds a table transaction to be committed atomically
+// with all other transactions in this multi-table transaction.
+// Returns an error if the transaction is nil, already committed, or
+// targets a table that was already added.
+func (m *MultiTableTransaction) AddTransaction(tx *table.Transaction) error {
+       if tx == nil {
+               return errors.New("transaction must not be nil")
+       }
+
+       if m.committed {
+               return errors.New("multi-table transaction has already been 
committed")
+       }
+
+       tc, err := tx.TableCommit()
+       if err != nil {
+               return err
+       }
+
+       key := strings.Join(tc.Identifier, ".")
+       if slices.Contains(m.idents, key) {
+               return fmt.Errorf("duplicate table in multi-table transaction: 
%s", key)
+       }
+
+       m.txns = append(m.txns, tx)
+       m.idents = append(m.idents, key)
+
+       return nil
+}
+
+// Commit extracts pending changes from all added transactions and
+// commits them atomically. On success, all transactions are marked
+// as committed. On failure, no transactions are marked committed
+// and the caller may retry.
+//
+// PostCommit hooks are not executed. Because the multi-table commit
+// endpoint returns 204 No Content, callers must LoadTable individually
+// to obtain updated metadata.
+func (m *MultiTableTransaction) Commit(ctx context.Context) error {
+       if m.committed {
+               return errors.New("multi-table transaction has already been 
committed")
+       }
+
+       if len(m.txns) == 0 {
+               return ErrEmptyCommitList
+       }
+
+       commits := make([]table.TableCommit, 0, len(m.txns))
+       for _, tx := range m.txns {
+               tc, err := tx.TableCommit()
+               if err != nil {
+                       return err
+               }
+
+               commits = append(commits, tc)
+       }
+
+       if err := m.cat.CommitTransaction(ctx, commits); err != nil {
+               return err
+       }
+
+       m.committed = true
+
+       // Mark all transactions as committed to prevent reuse.
+       for _, tx := range m.txns {
+               tx.MarkCommitted()
+       }
+
+       return nil
+}
diff --git a/catalog/multi_table_transaction_test.go 
b/catalog/multi_table_transaction_test.go
new file mode 100644
index 00000000..666048b8
--- /dev/null
+++ b/catalog/multi_table_transaction_test.go
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package catalog
+
+import (
+       "context"
+       "testing"
+
+       "github.com/apache/iceberg-go"
+       "github.com/apache/iceberg-go/table"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+type stubCatalog struct {
+       commits [][]table.TableCommit
+}
+
+func (s *stubCatalog) CommitTransaction(_ context.Context, commits 
[]table.TableCommit) error {
+       s.commits = append(s.commits, commits)
+
+       return nil
+}
+
+func mtxTestTable(t *testing.T, ns, name string) *table.Table {
+       t.Helper()
+
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+       )
+
+       meta, err := table.NewMetadata(schema, iceberg.UnpartitionedSpec,
+               table.UnsortedSortOrder, "s3://bucket/test",
+               iceberg.Properties{table.PropertyFormatVersion: "2"})
+       require.NoError(t, err)
+
+       return table.New(table.Identifier{ns, name}, meta, "", nil, nil)
+}
+
+func TestMultiTableTransaction(t *testing.T) {
+       stub := &stubCatalog{}
+       mtx := &MultiTableTransaction{cat: stub}
+
+       // Add two tables with changes.
+       tx1 := mtxTestTable(t, "db", "t1").NewTransaction()
+       require.NoError(t, tx1.SetProperties(map[string]string{"k": "v"}))
+       require.NoError(t, mtx.AddTransaction(tx1))
+
+       tx2 := mtxTestTable(t, "db", "t2").NewTransaction()
+       require.NoError(t, tx2.SetProperties(map[string]string{"k": "v"}))
+       require.NoError(t, mtx.AddTransaction(tx2))
+
+       // Duplicate table is rejected.
+       assert.ErrorContains(t, mtx.AddTransaction(mtxTestTable(t, "db", 
"t1").NewTransaction()), "duplicate table")
+
+       // Already-committed transaction is rejected.
+       tx3 := mtxTestTable(t, "db", "t3").NewTransaction()
+       tx3.MarkCommitted()
+       assert.ErrorContains(t, mtx.AddTransaction(tx3), "already been 
committed")
+
+       // Commit sends both to the catalog and marks transactions done.
+       require.NoError(t, mtx.Commit(context.Background()))
+       require.Len(t, stub.commits, 1)
+       assert.Len(t, stub.commits[0], 2)
+
+       _, err := tx1.TableCommit()
+       assert.ErrorContains(t, err, "already been committed")
+
+       // Double commit and add-after-commit are rejected.
+       assert.ErrorContains(t, mtx.Commit(context.Background()), "already been 
committed")
+       assert.ErrorContains(t, mtx.AddTransaction(mtxTestTable(t, "db", 
"t4").NewTransaction()), "already been committed")
+}
+
+func TestMultiTableTransactionEmpty(t *testing.T) {
+       mtx := &MultiTableTransaction{cat: &stubCatalog{}}
+       assert.ErrorIs(t, mtx.Commit(context.Background()), ErrEmptyCommitList)
+}
+
+func TestNewMultiTableTransactionNonTransactional(t *testing.T) {
+       // nil doesn't implement TransactionalCatalog
+       _, err := NewMultiTableTransaction(nil)
+       require.Error(t, err)
+       assert.Contains(t, err.Error(), "does not support")
+}
diff --git a/catalog/rest/rest.go b/catalog/rest/rest.go
index 41ead570..dd181969 100644
--- a/catalog/rest/rest.go
+++ b/catalog/rest/rest.go
@@ -48,7 +48,10 @@ import (
        "golang.org/x/oauth2/clientcredentials"
 )
 
-var _ catalog.Catalog = (*Catalog)(nil)
+var (
+       _ catalog.Catalog              = (*Catalog)(nil)
+       _ catalog.TransactionalCatalog = (*Catalog)(nil)
+)
 
 const (
        pageSizeKey contextKey = "page_size"
@@ -885,6 +888,69 @@ func (r *Catalog) CommitTable(ctx context.Context, ident 
table.Identifier, requi
        return ret.Metadata, ret.MetadataLoc, nil
 }
 
+// CommitTransaction atomically commits changes to multiple tables in a
+// single request. It implements [catalog.TransactionalCatalog].
+//
+// The server applies all changes or none (all-or-nothing). On success
+// (204 No Content) the method returns nil. Callers must LoadTable
+// individually to obtain updated metadata.
+func (r *Catalog) CommitTransaction(ctx context.Context, commits 
[]table.TableCommit) error {
+       if len(commits) == 0 {
+               return catalog.ErrEmptyCommitList
+       }
+
+       type tableChange struct {
+               Identifier   identifier          `json:"identifier"`
+               Requirements []table.Requirement `json:"requirements"`
+               Updates      []table.Update      `json:"updates"`
+       }
+
+       type payload struct {
+               TableChanges []tableChange `json:"table-changes"`
+       }
+
+       changes := make([]tableChange, len(commits))
+       for i, c := range commits {
+               if len(c.Identifier) == 0 {
+                       return catalog.ErrMissingIdentifier
+               }
+
+               reqs := c.Requirements
+               if reqs == nil {
+                       reqs = []table.Requirement{}
+               }
+               updates := c.Updates
+               if updates == nil {
+                       updates = []table.Update{}
+               }
+
+               changes[i] = tableChange{
+                       Identifier: identifier{
+                               Namespace: 
catalog.NamespaceFromIdent(c.Identifier),
+                               Name:      
catalog.TableNameFromIdent(c.Identifier),
+                       },
+                       Requirements: reqs,
+                       Updates:      updates,
+               }
+       }
+
+       _, err := doPostAllowNoContent[payload, struct{}](
+               ctx, r.baseURI, []string{"transactions", "commit"},
+               payload{TableChanges: changes}, r.cl,
+               map[int]error{
+                       http.StatusNotFound:            catalog.ErrNoSuchTable,
+                       http.StatusConflict:            ErrCommitFailed,
+                       http.StatusInternalServerError: ErrCommitStateUnknown,
+                       http.StatusBadGateway:          ErrCommitStateUnknown,
+                       http.StatusServiceUnavailable:  ErrCommitStateUnknown,
+                       http.StatusGatewayTimeout:      ErrCommitStateUnknown,
+               },
+               true,
+       )
+
+       return err
+}
+
 func (r *Catalog) RegisterTable(ctx context.Context, identifier 
table.Identifier, metadataLoc string) (*table.Table, error) {
        ns, tbl, err := splitIdentForPath(identifier)
        if err != nil {
diff --git a/catalog/rest/rest_integration_test.go 
b/catalog/rest/rest_integration_test.go
index ca156f62..c0a7d217 100644
--- a/catalog/rest/rest_integration_test.go
+++ b/catalog/rest/rest_integration_test.go
@@ -358,6 +358,101 @@ func (s *RestIntegrationSuite) TestWriteCommitTable() {
        s.Equal(pqfile, entries[0].DataFile().FilePath())
 }
 
+func (s *RestIntegrationSuite) writeParquetFile(tbl *table.Table, schema 
*iceberg.Schema, subpath string) string {
+       s.T().Helper()
+
+       arrSchema, err := table.SchemaToArrowSchema(schema, nil, false, false)
+       s.Require().NoError(err)
+
+       arrowTbl, err := array.TableFromJSON(memory.DefaultAllocator, arrSchema,
+               []string{`[
+               {"foo": "hello", "bar": 1, "baz": true},
+               {"foo": "world", "bar": 2, "baz": false}
+       ]`})
+       s.Require().NoError(err)
+       defer arrowTbl.Release()
+
+       pqfile, err := url.JoinPath(tbl.Location(), "data", subpath, 
"test.parquet")
+       s.Require().NoError(err)
+
+       fw, err := mustFS(s.T(), tbl).(io.WriteFileIO).Create(pqfile)
+       s.Require().NoError(err)
+       s.Require().NoError(pqarrow.WriteTable(arrowTbl, fw, arrowTbl.NumRows(),
+               nil, pqarrow.DefaultWriterProps()))
+
+       return pqfile
+}
+
+func (s *RestIntegrationSuite) TestMultiTableCommit() {
+       s.ensureNamespace()
+
+       const location = "s3://warehouse/iceberg"
+       tbl1Ident := catalog.ToIdentifier(TestNamespaceIdent, 
"multi-txn-table-1")
+       tbl2Ident := catalog.ToIdentifier(TestNamespaceIdent, 
"multi-txn-table-2")
+
+       tbl1, err := s.cat.CreateTable(s.ctx, tbl1Ident, tableSchemaSimple,
+               catalog.WithLocation(location+"/multi-txn-1"))
+       s.Require().NoError(err)
+       s.Require().NotNil(tbl1)
+       defer func() {
+               s.Require().NoError(s.cat.DropTable(s.ctx, tbl1Ident))
+       }()
+
+       tbl2, err := s.cat.CreateTable(s.ctx, tbl2Ident, tableSchemaSimple,
+               catalog.WithLocation(location+"/multi-txn-2"))
+       s.Require().NoError(err)
+       s.Require().NotNil(tbl2)
+       defer func() {
+               s.Require().NoError(s.cat.DropTable(s.ctx, tbl2Ident))
+       }()
+
+       // Write a parquet data file for each table.
+       pq1 := s.writeParquetFile(tbl1, tableSchemaSimple, "multi-txn-1")
+       defer mustFS(s.T(), tbl1).Remove(pq1)
+
+       pq2 := s.writeParquetFile(tbl2, tableSchemaSimple, "multi-txn-2")
+       defer mustFS(s.T(), tbl2).Remove(pq2)
+
+       // Build transactions that add data files to each table.
+       tx1 := tbl1.NewTransaction()
+       s.Require().NoError(tx1.AddFiles(s.ctx, []string{pq1}, nil, false))
+
+       tx2 := tbl2.NewTransaction()
+       s.Require().NoError(tx2.AddFiles(s.ctx, []string{pq2}, nil, false))
+
+       // Commit both atomically via MultiTableTransaction.
+       mtx, err := catalog.NewMultiTableTransaction(s.cat)
+       s.Require().NoError(err)
+       s.Require().NoError(mtx.AddTransaction(tx1))
+       s.Require().NoError(mtx.AddTransaction(tx2))
+       s.Require().NoError(mtx.Commit(s.ctx))
+
+       // Reload tables and verify each has one snapshot with the data file.
+       loaded1, err := s.cat.LoadTable(s.ctx, tbl1Ident)
+       s.Require().NoError(err)
+       s.Require().NotNil(loaded1.CurrentSnapshot())
+
+       mf1 := []iceberg.ManifestFile{}
+       for m, err := range loaded1.AllManifests(s.ctx) {
+               s.Require().NoError(err)
+               mf1 = append(mf1, m)
+       }
+       s.Len(mf1, 1)
+       s.EqualValues(1, mf1[0].AddedDataFiles())
+
+       loaded2, err := s.cat.LoadTable(s.ctx, tbl2Ident)
+       s.Require().NoError(err)
+       s.Require().NotNil(loaded2.CurrentSnapshot())
+
+       mf2 := []iceberg.ManifestFile{}
+       for m, err := range loaded2.AllManifests(s.ctx) {
+               s.Require().NoError(err)
+               mf2 = append(mf2, m)
+       }
+       s.Len(mf2, 1)
+       s.EqualValues(1, mf2[0].AddedDataFiles())
+}
+
 func mustFS(t *testing.T, tbl *table.Table) io.IO {
        r, err := tbl.FS(context.Background())
        require.NoError(t, err)
diff --git a/catalog/rest/rest_test.go b/catalog/rest/rest_test.go
index ae8e16aa..8d223bf6 100644
--- a/catalog/rest/rest_test.go
+++ b/catalog/rest/rest_test.go
@@ -3000,3 +3000,160 @@ func (r *RestCatalogSuite) 
TestUpdateTableErrCommitStateUnknown() {
                })
        }
 }
+
+func (r *RestCatalogSuite) TestCommitTransactionSuccess() {
+       r.mux.HandleFunc("/v1/oauth/tokens", func(w http.ResponseWriter, req 
*http.Request) {
+               w.Header().Set("Content-Type", "application/json")
+               json.NewEncoder(w).Encode(map[string]any{
+                       "access_token": TestToken, "token_type": "Bearer", 
"expires_in": 3600,
+               })
+       })
+
+       var receivedBody map[string]any
+       r.mux.HandleFunc("/v1/transactions/commit", func(w http.ResponseWriter, 
req *http.Request) {
+               r.Equal(http.MethodPost, req.Method)
+               r.NoError(json.NewDecoder(req.Body).Decode(&receivedBody))
+               w.WriteHeader(http.StatusNoContent)
+       })
+
+       cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
+               rest.WithCredential(TestCreds))
+       r.Require().NoError(err)
+
+       // Verify the interface is satisfied when accessed through 
catalog.Catalog.
+       var catIface catalog.Catalog = cat
+       _, ok := catIface.(catalog.TransactionalCatalog)
+       r.Require().True(ok, "REST catalog must implement TransactionalCatalog")
+
+       err = cat.CommitTransaction(context.Background(), []table.TableCommit{
+               {
+                       Identifier:   table.Identifier{"db", "t1"},
+                       Requirements: []table.Requirement{},
+                       Updates:      []table.Update{},
+               },
+               {
+                       Identifier:   table.Identifier{"db", "t2"},
+                       Requirements: []table.Requirement{},
+                       Updates:      []table.Update{},
+               },
+       })
+       r.Require().NoError(err)
+
+       // Verify request body structure
+       r.Contains(receivedBody, "table-changes")
+       changes, ok := receivedBody["table-changes"].([]any)
+       r.True(ok)
+       r.Len(changes, 2)
+
+       first := changes[0].(map[string]any)
+       ident := first["identifier"].(map[string]any)
+       r.Equal("t1", ident["name"])
+
+       ns := ident["namespace"].([]any)
+       r.Len(ns, 1)
+       r.Equal("db", ns[0])
+}
+
+func (r *RestCatalogSuite) TestCommitTransactionEmptyList() {
+       r.mux.HandleFunc("/v1/oauth/tokens", func(w http.ResponseWriter, req 
*http.Request) {
+               w.Header().Set("Content-Type", "application/json")
+               json.NewEncoder(w).Encode(map[string]any{
+                       "access_token": TestToken, "token_type": "Bearer", 
"expires_in": 3600,
+               })
+       })
+
+       cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
+               rest.WithCredential(TestCreds))
+       r.Require().NoError(err)
+
+       err = cat.CommitTransaction(context.Background(), []table.TableCommit{})
+       r.ErrorIs(err, catalog.ErrEmptyCommitList)
+}
+
+func (r *RestCatalogSuite) TestCommitTransactionMissingIdentifier() {
+       r.mux.HandleFunc("/v1/oauth/tokens", func(w http.ResponseWriter, req 
*http.Request) {
+               w.Header().Set("Content-Type", "application/json")
+               json.NewEncoder(w).Encode(map[string]any{
+                       "access_token": TestToken, "token_type": "Bearer", 
"expires_in": 3600,
+               })
+       })
+
+       cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
+               rest.WithCredential(TestCreds))
+       r.Require().NoError(err)
+
+       err = cat.CommitTransaction(context.Background(), []table.TableCommit{
+               {Identifier: nil, Requirements: []table.Requirement{}, Updates: 
[]table.Update{}},
+       })
+       r.ErrorIs(err, catalog.ErrMissingIdentifier)
+}
+
+func (r *RestCatalogSuite) TestCommitTransactionConflict() {
+       r.mux.HandleFunc("/v1/oauth/tokens", func(w http.ResponseWriter, req 
*http.Request) {
+               w.Header().Set("Content-Type", "application/json")
+               json.NewEncoder(w).Encode(map[string]any{
+                       "access_token": TestToken, "token_type": "Bearer", 
"expires_in": 3600,
+               })
+       })
+
+       r.mux.HandleFunc("/v1/transactions/commit", func(w http.ResponseWriter, 
req *http.Request) {
+               w.WriteHeader(http.StatusConflict)
+               json.NewEncoder(w).Encode(map[string]any{
+                       "error": map[string]any{
+                               "message": "Conflict", "type": 
"CommitFailedException", "code": 409,
+                       },
+               })
+       })
+
+       cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
+               rest.WithCredential(TestCreds))
+       r.Require().NoError(err)
+
+       err = cat.CommitTransaction(context.Background(), []table.TableCommit{
+               {Identifier: table.Identifier{"db", "t1"}, Requirements: 
[]table.Requirement{}, Updates: []table.Update{}},
+       })
+       r.ErrorIs(err, rest.ErrCommitFailed)
+}
+
+func (r *RestCatalogSuite) TestCommitTransactionErrCommitStateUnknown() {
+       var statusCode int
+
+       r.mux.HandleFunc("/v1/oauth/tokens", func(w http.ResponseWriter, req 
*http.Request) {
+               w.Header().Set("Content-Type", "application/json")
+               json.NewEncoder(w).Encode(map[string]any{
+                       "access_token": TestToken, "token_type": "Bearer", 
"expires_in": 3600,
+               })
+       })
+
+       r.mux.HandleFunc("/v1/transactions/commit", func(w http.ResponseWriter, 
req *http.Request) {
+               w.WriteHeader(statusCode)
+               json.NewEncoder(w).Encode(map[string]any{
+                       "error": map[string]any{
+                               "message": http.StatusText(statusCode),
+                               "type":    "ServerException",
+                               "code":    statusCode,
+                       },
+               })
+       })
+
+       cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
+               rest.WithCredential(TestCreds))
+       r.Require().NoError(err)
+
+       for _, code := range []int{
+               http.StatusInternalServerError,
+               http.StatusBadGateway,
+               http.StatusServiceUnavailable,
+               http.StatusGatewayTimeout,
+       } {
+               statusCode = code
+               r.Run(strconv.Itoa(code), func() {
+                       err := cat.CommitTransaction(context.Background(), 
[]table.TableCommit{
+                               {Identifier: table.Identifier{"db", "t1"}, 
Requirements: []table.Requirement{}, Updates: []table.Update{}},
+                       })
+                       r.Require().Error(err)
+                       r.ErrorIs(err, rest.ErrCommitStateUnknown,
+                               "%d should return ErrCommitStateUnknown, got: 
%v", code, err)
+               })
+       }
+}
diff --git a/table/commit.go b/table/commit.go
new file mode 100644
index 00000000..98df8463
--- /dev/null
+++ b/table/commit.go
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import "errors"
+
+// TableCommit holds the identifier, requirements, and updates for a single
+// table within a multi-table transaction. It is used with
+// [catalog.TransactionalCatalog.CommitTransaction] to atomically commit
+// changes across multiple tables.
+type TableCommit struct {
+       Identifier   Identifier
+       Requirements []Requirement
+       Updates      []Update
+}
+
+// TableCommit returns a TableCommit representing the pending changes in this
+// transaction without actually committing them. This is intended for
+// multi-table transactions where several TableCommit values are collected
+// and submitted together via [catalog.TransactionalCatalog.CommitTransaction].
+//
+// Most callers should use [catalog.MultiTableTransaction] instead of calling
+// this method directly — it handles extraction, commit, and lifecycle
+// management automatically.
+//
+// The method automatically includes an AssertTableUUID requirement, matching
+// the behavior of [Transaction.Commit].
+//
+// TableCommit does not mark the transaction as committed — the caller is
+// responsible for either calling Commit (single-table) or submitting the
+// returned TableCommit via CommitTransaction (multi-table). After a
+// successful multi-table commit the caller should call MarkCommitted to
+// prevent accidental reuse.
+//
+// PostCommit hooks are NOT executed by this method. Because the multi-table
+// commit endpoint returns 204 No Content (no metadata), callers must
+// LoadTable after a successful CommitTransaction if they need updated state.
+func (t *Transaction) TableCommit() (TableCommit, error) {
+       t.mx.Lock()
+       defer t.mx.Unlock()
+
+       if t.committed {
+               return TableCommit{}, errors.New("transaction has already been 
committed")
+       }
+
+       if len(t.meta.updates) == 0 {
+               return TableCommit{
+                       Identifier:   t.tbl.identifier,
+                       Requirements: []Requirement{},
+                       Updates:      []Update{},
+               }, nil
+       }
+
+       reqs := make([]Requirement, len(t.reqs), len(t.reqs)+1)
+       copy(reqs, t.reqs)
+       reqs = append(reqs, AssertTableUUID(t.meta.uuid))
+
+       updates := make([]Update, len(t.meta.updates))
+       copy(updates, t.meta.updates)
+
+       return TableCommit{
+               Identifier:   t.tbl.identifier,
+               Requirements: reqs,
+               Updates:      updates,
+       }, nil
+}
+
+// MarkCommitted marks the transaction as committed, preventing further use.
+// This should be called after a successful multi-table commit via
+// [catalog.TransactionalCatalog.CommitTransaction].
+func (t *Transaction) MarkCommitted() {
+       t.mx.Lock()
+       defer t.mx.Unlock()
+
+       t.committed = true
+}
diff --git a/table/commit_test.go b/table/commit_test.go
new file mode 100644
index 00000000..eb4a3240
--- /dev/null
+++ b/table/commit_test.go
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table_test
+
+import (
+       "testing"
+
+       "github.com/apache/iceberg-go"
+       "github.com/apache/iceberg-go/table"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+func newCommitTestTable(t *testing.T) *table.Table {
+       t.Helper()
+
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "data", Type: 
iceberg.PrimitiveTypes.String, Required: false},
+       )
+
+       meta, err := table.NewMetadata(schema, iceberg.UnpartitionedSpec,
+               table.UnsortedSortOrder, "s3://bucket/test",
+               iceberg.Properties{table.PropertyFormatVersion: "2"})
+       require.NoError(t, err)
+
+       return table.New(
+               table.Identifier{"db", "test_table"},
+               meta, "s3://bucket/test/metadata/v1.metadata.json",
+               nil, nil,
+       )
+}
+
+func TestTableCommitFromEmptyTransaction(t *testing.T) {
+       tbl := newCommitTestTable(t)
+       tx := tbl.NewTransaction()
+
+       tc, err := tx.TableCommit()
+       require.NoError(t, err)
+
+       assert.Equal(t, table.Identifier{"db", "test_table"}, tc.Identifier)
+       assert.NotNil(t, tc.Requirements, "Requirements must be non-nil empty 
slice for JSON serialization")
+       assert.NotNil(t, tc.Updates, "Updates must be non-nil empty slice for 
JSON serialization")
+       assert.Empty(t, tc.Requirements)
+       assert.Empty(t, tc.Updates)
+}
+
+func TestTableCommitAfterCommitFails(t *testing.T) {
+       tbl := newCommitTestTable(t)
+       tx := tbl.NewTransaction()
+
+       tx.MarkCommitted()
+
+       _, err := tx.TableCommit()
+       require.Error(t, err)
+       assert.Contains(t, err.Error(), "already been committed")
+}
+
+func TestTableCommitIncludesAssertTableUUID(t *testing.T) {
+       tbl := newCommitTestTable(t)
+       tx := tbl.NewTransaction()
+
+       require.NoError(t, tx.SetProperties(map[string]string{"key": "value"}))
+
+       tc, err := tx.TableCommit()
+       require.NoError(t, err)
+
+       assert.NotEmpty(t, tc.Updates)
+
+       var hasUUIDAssert bool
+       for _, r := range tc.Requirements {
+               if r.GetType() == "assert-table-uuid" {
+                       hasUUIDAssert = true
+
+                       break
+               }
+       }
+       assert.True(t, hasUUIDAssert, "expected AssertTableUUID requirement")
+}
+
+func TestTableCommitDoesNotMarkTransactionCommitted(t *testing.T) {
+       tbl := newCommitTestTable(t)
+       tx := tbl.NewTransaction()
+
+       require.NoError(t, tx.SetProperties(map[string]string{"key": "value"}))
+
+       _, err := tx.TableCommit()
+       require.NoError(t, err)
+
+       // Second call should also succeed — not marked as committed
+       _, err = tx.TableCommit()
+       require.NoError(t, err)
+}
+
+func TestTableCommitReturnsCopies(t *testing.T) {
+       tbl := newCommitTestTable(t)
+       tx := tbl.NewTransaction()
+
+       require.NoError(t, tx.SetProperties(map[string]string{"key": "value"}))
+
+       tc1, err := tx.TableCommit()
+       require.NoError(t, err)
+
+       tc2, err := tx.TableCommit()
+       require.NoError(t, err)
+
+       // Mutating one should not affect the other
+       if len(tc1.Requirements) > 0 {
+               tc1.Requirements = tc1.Requirements[:0]
+       }
+       assert.NotEmpty(t, tc2.Requirements)
+}
+
+func TestMarkCommittedPreventsCommit(t *testing.T) {
+       tbl := newCommitTestTable(t)
+       tx := tbl.NewTransaction()
+
+       tx.MarkCommitted()
+
+       _, err := tx.Commit(t.Context())
+       require.Error(t, err)
+       assert.Contains(t, err.Error(), "already been committed")
+}

Reply via email to