This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 6fab25ce feat(catalog): add TransactionalCatalog interface and REST
multi-table commit (#787)
6fab25ce is described below
commit 6fab25ce5797164b16022ce1ebc0d97d4311ed2d
Author: Andrei Tserakhau <[email protected]>
AuthorDate: Wed Mar 25 00:25:31 2026 +0100
feat(catalog): add TransactionalCatalog interface and REST multi-table
commit (#787)
Add support for atomic multi-table commits via the Iceberg REST spec's
POST /v1/transactions/commit endpoint.
- Add TableCommit type bundling identifier, requirements, and updates
- Add TransactionalCatalog optional interface with CommitTransaction
- Implement CommitTransaction on REST catalog
- Add Transaction.TableCommit() to extract pending changes without
committing
- Add Transaction.MarkCommitted() for use after multi-table commit
Closes #784
---
catalog/catalog.go | 18 ++++
catalog/multi_table_transaction.go | 137 ++++++++++++++++++++++++++++
catalog/multi_table_transaction_test.go | 99 ++++++++++++++++++++
catalog/rest/rest.go | 68 +++++++++++++-
catalog/rest/rest_integration_test.go | 95 +++++++++++++++++++
catalog/rest/rest_test.go | 157 ++++++++++++++++++++++++++++++++
table/commit.go | 91 ++++++++++++++++++
table/commit_test.go | 138 ++++++++++++++++++++++++++++
8 files changed, 802 insertions(+), 1 deletion(-)
diff --git a/catalog/catalog.go b/catalog/catalog.go
index cc93e5f2..bfa57350 100644
--- a/catalog/catalog.go
+++ b/catalog/catalog.go
@@ -60,6 +60,8 @@ var (
ErrNamespaceNotEmpty = errors.New("namespace is not empty")
ErrNoSuchView = errors.New("view does not exist")
ErrViewAlreadyExists = errors.New("view already exists")
+ ErrEmptyCommitList = errors.New("commit list must not be empty")
+ ErrMissingIdentifier = errors.New("every table commit must have a
valid identifier")
)
type PropertiesUpdateSummary struct {
@@ -148,6 +150,22 @@ type Catalog interface {
removals []string, updates iceberg.Properties)
(PropertiesUpdateSummary, error)
}
+// TransactionalCatalog is an optional interface implemented by catalogs
+// that support atomic multi-table commits. Callers should check for this
+// capability via a type assertion:
+//
+// if tc, ok := cat.(catalog.TransactionalCatalog); ok {
+// err := tc.CommitTransaction(ctx, commits)
+// }
+//
+// The endpoint is all-or-nothing: either all table changes are applied
+// atomically, or none are. On success the method returns nil; the caller
+// must LoadTable individually to obtain updated metadata because the
+// server returns 204 No Content.
+type TransactionalCatalog interface {
+ CommitTransaction(ctx context.Context, commits []table.TableCommit)
error
+}
+
func ToIdentifier(ident ...string) table.Identifier {
if len(ident) == 1 {
if ident[0] == "" {
diff --git a/catalog/multi_table_transaction.go
b/catalog/multi_table_transaction.go
new file mode 100644
index 00000000..d391fc9d
--- /dev/null
+++ b/catalog/multi_table_transaction.go
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package catalog
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "slices"
+ "strings"
+
+ "github.com/apache/iceberg-go/table"
+)
+
+// MultiTableTransaction collects changes across multiple tables and
+// commits them atomically via [TransactionalCatalog.CommitTransaction].
+//
+// A MultiTableTransaction must not be used concurrently from multiple
+// goroutines.
+//
+// Usage:
+//
+// mtx, err := catalog.NewMultiTableTransaction(cat)
+// // ... err check ...
+//
+// tx1 := tbl1.NewTransaction()
+// tx1.SetProperties(map[string]string{"key": "val"})
+// mtx.AddTransaction(tx1)
+//
+// tx2 := tbl2.NewTransaction()
+// // ... build changes on tx2 ...
+// mtx.AddTransaction(tx2)
+//
+// err = mtx.Commit(ctx)
+type MultiTableTransaction struct {
+ cat TransactionalCatalog
+ txns []*table.Transaction
+ idents []string
+ committed bool
+}
+
+// NewMultiTableTransaction creates a new multi-table transaction backed
+// by the given catalog. Returns an error if the catalog does not
+// implement [TransactionalCatalog].
+func NewMultiTableTransaction(cat Catalog) (*MultiTableTransaction, error) {
+ tc, ok := cat.(TransactionalCatalog)
+ if !ok {
+ return nil, errors.New("catalog does not support multi-table
transactions")
+ }
+
+ return &MultiTableTransaction{cat: tc}, nil
+}
+
+// AddTransaction adds a table transaction to be committed atomically
+// with all other transactions in this multi-table transaction.
+// Returns an error if the transaction is nil, already committed, or
+// targets a table that was already added.
+func (m *MultiTableTransaction) AddTransaction(tx *table.Transaction) error {
+ if tx == nil {
+ return errors.New("transaction must not be nil")
+ }
+
+ if m.committed {
+ return errors.New("multi-table transaction has already been
committed")
+ }
+
+ tc, err := tx.TableCommit()
+ if err != nil {
+ return err
+ }
+
+ key := strings.Join(tc.Identifier, ".")
+ if slices.Contains(m.idents, key) {
+ return fmt.Errorf("duplicate table in multi-table transaction:
%s", key)
+ }
+
+ m.txns = append(m.txns, tx)
+ m.idents = append(m.idents, key)
+
+ return nil
+}
+
+// Commit extracts pending changes from all added transactions and
+// commits them atomically. On success, all transactions are marked
+// as committed. On failure, no transactions are marked committed
+// and the caller may retry.
+//
+// PostCommit hooks are not executed. Because the multi-table commit
+// endpoint returns 204 No Content, callers must LoadTable individually
+// to obtain updated metadata.
+func (m *MultiTableTransaction) Commit(ctx context.Context) error {
+ if m.committed {
+ return errors.New("multi-table transaction has already been
committed")
+ }
+
+ if len(m.txns) == 0 {
+ return ErrEmptyCommitList
+ }
+
+ commits := make([]table.TableCommit, 0, len(m.txns))
+ for _, tx := range m.txns {
+ tc, err := tx.TableCommit()
+ if err != nil {
+ return err
+ }
+
+ commits = append(commits, tc)
+ }
+
+ if err := m.cat.CommitTransaction(ctx, commits); err != nil {
+ return err
+ }
+
+ m.committed = true
+
+ // Mark all transactions as committed to prevent reuse.
+ for _, tx := range m.txns {
+ tx.MarkCommitted()
+ }
+
+ return nil
+}
diff --git a/catalog/multi_table_transaction_test.go
b/catalog/multi_table_transaction_test.go
new file mode 100644
index 00000000..666048b8
--- /dev/null
+++ b/catalog/multi_table_transaction_test.go
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package catalog
+
+import (
+ "context"
+ "testing"
+
+ "github.com/apache/iceberg-go"
+ "github.com/apache/iceberg-go/table"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+type stubCatalog struct {
+ commits [][]table.TableCommit
+}
+
+func (s *stubCatalog) CommitTransaction(_ context.Context, commits
[]table.TableCommit) error {
+ s.commits = append(s.commits, commits)
+
+ return nil
+}
+
+func mtxTestTable(t *testing.T, ns, name string) *table.Table {
+ t.Helper()
+
+ schema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ )
+
+ meta, err := table.NewMetadata(schema, iceberg.UnpartitionedSpec,
+ table.UnsortedSortOrder, "s3://bucket/test",
+ iceberg.Properties{table.PropertyFormatVersion: "2"})
+ require.NoError(t, err)
+
+ return table.New(table.Identifier{ns, name}, meta, "", nil, nil)
+}
+
+func TestMultiTableTransaction(t *testing.T) {
+ stub := &stubCatalog{}
+ mtx := &MultiTableTransaction{cat: stub}
+
+ // Add two tables with changes.
+ tx1 := mtxTestTable(t, "db", "t1").NewTransaction()
+ require.NoError(t, tx1.SetProperties(map[string]string{"k": "v"}))
+ require.NoError(t, mtx.AddTransaction(tx1))
+
+ tx2 := mtxTestTable(t, "db", "t2").NewTransaction()
+ require.NoError(t, tx2.SetProperties(map[string]string{"k": "v"}))
+ require.NoError(t, mtx.AddTransaction(tx2))
+
+ // Duplicate table is rejected.
+ assert.ErrorContains(t, mtx.AddTransaction(mtxTestTable(t, "db",
"t1").NewTransaction()), "duplicate table")
+
+ // Already-committed transaction is rejected.
+ tx3 := mtxTestTable(t, "db", "t3").NewTransaction()
+ tx3.MarkCommitted()
+ assert.ErrorContains(t, mtx.AddTransaction(tx3), "already been
committed")
+
+ // Commit sends both to the catalog and marks transactions done.
+ require.NoError(t, mtx.Commit(context.Background()))
+ require.Len(t, stub.commits, 1)
+ assert.Len(t, stub.commits[0], 2)
+
+ _, err := tx1.TableCommit()
+ assert.ErrorContains(t, err, "already been committed")
+
+ // Double commit and add-after-commit are rejected.
+ assert.ErrorContains(t, mtx.Commit(context.Background()), "already been
committed")
+ assert.ErrorContains(t, mtx.AddTransaction(mtxTestTable(t, "db",
"t4").NewTransaction()), "already been committed")
+}
+
+func TestMultiTableTransactionEmpty(t *testing.T) {
+ mtx := &MultiTableTransaction{cat: &stubCatalog{}}
+ assert.ErrorIs(t, mtx.Commit(context.Background()), ErrEmptyCommitList)
+}
+
+func TestNewMultiTableTransactionNonTransactional(t *testing.T) {
+ // nil doesn't implement TransactionalCatalog
+ _, err := NewMultiTableTransaction(nil)
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "does not support")
+}
diff --git a/catalog/rest/rest.go b/catalog/rest/rest.go
index 41ead570..dd181969 100644
--- a/catalog/rest/rest.go
+++ b/catalog/rest/rest.go
@@ -48,7 +48,10 @@ import (
"golang.org/x/oauth2/clientcredentials"
)
-var _ catalog.Catalog = (*Catalog)(nil)
+var (
+ _ catalog.Catalog = (*Catalog)(nil)
+ _ catalog.TransactionalCatalog = (*Catalog)(nil)
+)
const (
pageSizeKey contextKey = "page_size"
@@ -885,6 +888,69 @@ func (r *Catalog) CommitTable(ctx context.Context, ident
table.Identifier, requi
return ret.Metadata, ret.MetadataLoc, nil
}
+// CommitTransaction atomically commits changes to multiple tables in a
+// single request. It implements [catalog.TransactionalCatalog].
+//
+// The server applies all changes or none (all-or-nothing). On success
+// (204 No Content) the method returns nil. Callers must LoadTable
+// individually to obtain updated metadata.
+func (r *Catalog) CommitTransaction(ctx context.Context, commits
[]table.TableCommit) error {
+ if len(commits) == 0 {
+ return catalog.ErrEmptyCommitList
+ }
+
+ type tableChange struct {
+ Identifier identifier `json:"identifier"`
+ Requirements []table.Requirement `json:"requirements"`
+ Updates []table.Update `json:"updates"`
+ }
+
+ type payload struct {
+ TableChanges []tableChange `json:"table-changes"`
+ }
+
+ changes := make([]tableChange, len(commits))
+ for i, c := range commits {
+ if len(c.Identifier) == 0 {
+ return catalog.ErrMissingIdentifier
+ }
+
+ reqs := c.Requirements
+ if reqs == nil {
+ reqs = []table.Requirement{}
+ }
+ updates := c.Updates
+ if updates == nil {
+ updates = []table.Update{}
+ }
+
+ changes[i] = tableChange{
+ Identifier: identifier{
+ Namespace:
catalog.NamespaceFromIdent(c.Identifier),
+ Name:
catalog.TableNameFromIdent(c.Identifier),
+ },
+ Requirements: reqs,
+ Updates: updates,
+ }
+ }
+
+ _, err := doPostAllowNoContent[payload, struct{}](
+ ctx, r.baseURI, []string{"transactions", "commit"},
+ payload{TableChanges: changes}, r.cl,
+ map[int]error{
+ http.StatusNotFound: catalog.ErrNoSuchTable,
+ http.StatusConflict: ErrCommitFailed,
+ http.StatusInternalServerError: ErrCommitStateUnknown,
+ http.StatusBadGateway: ErrCommitStateUnknown,
+ http.StatusServiceUnavailable: ErrCommitStateUnknown,
+ http.StatusGatewayTimeout: ErrCommitStateUnknown,
+ },
+ true,
+ )
+
+ return err
+}
+
func (r *Catalog) RegisterTable(ctx context.Context, identifier
table.Identifier, metadataLoc string) (*table.Table, error) {
ns, tbl, err := splitIdentForPath(identifier)
if err != nil {
diff --git a/catalog/rest/rest_integration_test.go
b/catalog/rest/rest_integration_test.go
index ca156f62..c0a7d217 100644
--- a/catalog/rest/rest_integration_test.go
+++ b/catalog/rest/rest_integration_test.go
@@ -358,6 +358,101 @@ func (s *RestIntegrationSuite) TestWriteCommitTable() {
s.Equal(pqfile, entries[0].DataFile().FilePath())
}
+func (s *RestIntegrationSuite) writeParquetFile(tbl *table.Table, schema
*iceberg.Schema, subpath string) string {
+ s.T().Helper()
+
+ arrSchema, err := table.SchemaToArrowSchema(schema, nil, false, false)
+ s.Require().NoError(err)
+
+ arrowTbl, err := array.TableFromJSON(memory.DefaultAllocator, arrSchema,
+ []string{`[
+ {"foo": "hello", "bar": 1, "baz": true},
+ {"foo": "world", "bar": 2, "baz": false}
+ ]`})
+ s.Require().NoError(err)
+ defer arrowTbl.Release()
+
+ pqfile, err := url.JoinPath(tbl.Location(), "data", subpath,
"test.parquet")
+ s.Require().NoError(err)
+
+ fw, err := mustFS(s.T(), tbl).(io.WriteFileIO).Create(pqfile)
+ s.Require().NoError(err)
+ s.Require().NoError(pqarrow.WriteTable(arrowTbl, fw, arrowTbl.NumRows(),
+ nil, pqarrow.DefaultWriterProps()))
+
+ return pqfile
+}
+
+func (s *RestIntegrationSuite) TestMultiTableCommit() {
+ s.ensureNamespace()
+
+ const location = "s3://warehouse/iceberg"
+ tbl1Ident := catalog.ToIdentifier(TestNamespaceIdent,
"multi-txn-table-1")
+ tbl2Ident := catalog.ToIdentifier(TestNamespaceIdent,
"multi-txn-table-2")
+
+ tbl1, err := s.cat.CreateTable(s.ctx, tbl1Ident, tableSchemaSimple,
+ catalog.WithLocation(location+"/multi-txn-1"))
+ s.Require().NoError(err)
+ s.Require().NotNil(tbl1)
+ defer func() {
+ s.Require().NoError(s.cat.DropTable(s.ctx, tbl1Ident))
+ }()
+
+ tbl2, err := s.cat.CreateTable(s.ctx, tbl2Ident, tableSchemaSimple,
+ catalog.WithLocation(location+"/multi-txn-2"))
+ s.Require().NoError(err)
+ s.Require().NotNil(tbl2)
+ defer func() {
+ s.Require().NoError(s.cat.DropTable(s.ctx, tbl2Ident))
+ }()
+
+ // Write a parquet data file for each table.
+ pq1 := s.writeParquetFile(tbl1, tableSchemaSimple, "multi-txn-1")
+ defer mustFS(s.T(), tbl1).Remove(pq1)
+
+ pq2 := s.writeParquetFile(tbl2, tableSchemaSimple, "multi-txn-2")
+ defer mustFS(s.T(), tbl2).Remove(pq2)
+
+ // Build transactions that add data files to each table.
+ tx1 := tbl1.NewTransaction()
+ s.Require().NoError(tx1.AddFiles(s.ctx, []string{pq1}, nil, false))
+
+ tx2 := tbl2.NewTransaction()
+ s.Require().NoError(tx2.AddFiles(s.ctx, []string{pq2}, nil, false))
+
+ // Commit both atomically via MultiTableTransaction.
+ mtx, err := catalog.NewMultiTableTransaction(s.cat)
+ s.Require().NoError(err)
+ s.Require().NoError(mtx.AddTransaction(tx1))
+ s.Require().NoError(mtx.AddTransaction(tx2))
+ s.Require().NoError(mtx.Commit(s.ctx))
+
+ // Reload tables and verify each has one snapshot with the data file.
+ loaded1, err := s.cat.LoadTable(s.ctx, tbl1Ident)
+ s.Require().NoError(err)
+ s.Require().NotNil(loaded1.CurrentSnapshot())
+
+ mf1 := []iceberg.ManifestFile{}
+ for m, err := range loaded1.AllManifests(s.ctx) {
+ s.Require().NoError(err)
+ mf1 = append(mf1, m)
+ }
+ s.Len(mf1, 1)
+ s.EqualValues(1, mf1[0].AddedDataFiles())
+
+ loaded2, err := s.cat.LoadTable(s.ctx, tbl2Ident)
+ s.Require().NoError(err)
+ s.Require().NotNil(loaded2.CurrentSnapshot())
+
+ mf2 := []iceberg.ManifestFile{}
+ for m, err := range loaded2.AllManifests(s.ctx) {
+ s.Require().NoError(err)
+ mf2 = append(mf2, m)
+ }
+ s.Len(mf2, 1)
+ s.EqualValues(1, mf2[0].AddedDataFiles())
+}
+
func mustFS(t *testing.T, tbl *table.Table) io.IO {
r, err := tbl.FS(context.Background())
require.NoError(t, err)
diff --git a/catalog/rest/rest_test.go b/catalog/rest/rest_test.go
index ae8e16aa..8d223bf6 100644
--- a/catalog/rest/rest_test.go
+++ b/catalog/rest/rest_test.go
@@ -3000,3 +3000,160 @@ func (r *RestCatalogSuite)
TestUpdateTableErrCommitStateUnknown() {
})
}
}
+
+func (r *RestCatalogSuite) TestCommitTransactionSuccess() {
+ r.mux.HandleFunc("/v1/oauth/tokens", func(w http.ResponseWriter, req
*http.Request) {
+ w.Header().Set("Content-Type", "application/json")
+ json.NewEncoder(w).Encode(map[string]any{
+ "access_token": TestToken, "token_type": "Bearer",
"expires_in": 3600,
+ })
+ })
+
+ var receivedBody map[string]any
+ r.mux.HandleFunc("/v1/transactions/commit", func(w http.ResponseWriter,
req *http.Request) {
+ r.Equal(http.MethodPost, req.Method)
+ r.NoError(json.NewDecoder(req.Body).Decode(&receivedBody))
+ w.WriteHeader(http.StatusNoContent)
+ })
+
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
+ rest.WithCredential(TestCreds))
+ r.Require().NoError(err)
+
+ // Verify the interface is satisfied when accessed through
catalog.Catalog.
+ var catIface catalog.Catalog = cat
+ _, ok := catIface.(catalog.TransactionalCatalog)
+ r.Require().True(ok, "REST catalog must implement TransactionalCatalog")
+
+ err = cat.CommitTransaction(context.Background(), []table.TableCommit{
+ {
+ Identifier: table.Identifier{"db", "t1"},
+ Requirements: []table.Requirement{},
+ Updates: []table.Update{},
+ },
+ {
+ Identifier: table.Identifier{"db", "t2"},
+ Requirements: []table.Requirement{},
+ Updates: []table.Update{},
+ },
+ })
+ r.Require().NoError(err)
+
+ // Verify request body structure
+ r.Contains(receivedBody, "table-changes")
+ changes, ok := receivedBody["table-changes"].([]any)
+ r.True(ok)
+ r.Len(changes, 2)
+
+ first := changes[0].(map[string]any)
+ ident := first["identifier"].(map[string]any)
+ r.Equal("t1", ident["name"])
+
+ ns := ident["namespace"].([]any)
+ r.Len(ns, 1)
+ r.Equal("db", ns[0])
+}
+
+func (r *RestCatalogSuite) TestCommitTransactionEmptyList() {
+ r.mux.HandleFunc("/v1/oauth/tokens", func(w http.ResponseWriter, req
*http.Request) {
+ w.Header().Set("Content-Type", "application/json")
+ json.NewEncoder(w).Encode(map[string]any{
+ "access_token": TestToken, "token_type": "Bearer",
"expires_in": 3600,
+ })
+ })
+
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
+ rest.WithCredential(TestCreds))
+ r.Require().NoError(err)
+
+ err = cat.CommitTransaction(context.Background(), []table.TableCommit{})
+ r.ErrorIs(err, catalog.ErrEmptyCommitList)
+}
+
+func (r *RestCatalogSuite) TestCommitTransactionMissingIdentifier() {
+ r.mux.HandleFunc("/v1/oauth/tokens", func(w http.ResponseWriter, req
*http.Request) {
+ w.Header().Set("Content-Type", "application/json")
+ json.NewEncoder(w).Encode(map[string]any{
+ "access_token": TestToken, "token_type": "Bearer",
"expires_in": 3600,
+ })
+ })
+
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
+ rest.WithCredential(TestCreds))
+ r.Require().NoError(err)
+
+ err = cat.CommitTransaction(context.Background(), []table.TableCommit{
+ {Identifier: nil, Requirements: []table.Requirement{}, Updates:
[]table.Update{}},
+ })
+ r.ErrorIs(err, catalog.ErrMissingIdentifier)
+}
+
+func (r *RestCatalogSuite) TestCommitTransactionConflict() {
+ r.mux.HandleFunc("/v1/oauth/tokens", func(w http.ResponseWriter, req
*http.Request) {
+ w.Header().Set("Content-Type", "application/json")
+ json.NewEncoder(w).Encode(map[string]any{
+ "access_token": TestToken, "token_type": "Bearer",
"expires_in": 3600,
+ })
+ })
+
+ r.mux.HandleFunc("/v1/transactions/commit", func(w http.ResponseWriter,
req *http.Request) {
+ w.WriteHeader(http.StatusConflict)
+ json.NewEncoder(w).Encode(map[string]any{
+ "error": map[string]any{
+ "message": "Conflict", "type":
"CommitFailedException", "code": 409,
+ },
+ })
+ })
+
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
+ rest.WithCredential(TestCreds))
+ r.Require().NoError(err)
+
+ err = cat.CommitTransaction(context.Background(), []table.TableCommit{
+ {Identifier: table.Identifier{"db", "t1"}, Requirements:
[]table.Requirement{}, Updates: []table.Update{}},
+ })
+ r.ErrorIs(err, rest.ErrCommitFailed)
+}
+
+func (r *RestCatalogSuite) TestCommitTransactionErrCommitStateUnknown() {
+ var statusCode int
+
+ r.mux.HandleFunc("/v1/oauth/tokens", func(w http.ResponseWriter, req
*http.Request) {
+ w.Header().Set("Content-Type", "application/json")
+ json.NewEncoder(w).Encode(map[string]any{
+ "access_token": TestToken, "token_type": "Bearer",
"expires_in": 3600,
+ })
+ })
+
+ r.mux.HandleFunc("/v1/transactions/commit", func(w http.ResponseWriter,
req *http.Request) {
+ w.WriteHeader(statusCode)
+ json.NewEncoder(w).Encode(map[string]any{
+ "error": map[string]any{
+ "message": http.StatusText(statusCode),
+ "type": "ServerException",
+ "code": statusCode,
+ },
+ })
+ })
+
+ cat, err := rest.NewCatalog(context.Background(), "rest", r.srv.URL,
+ rest.WithCredential(TestCreds))
+ r.Require().NoError(err)
+
+ for _, code := range []int{
+ http.StatusInternalServerError,
+ http.StatusBadGateway,
+ http.StatusServiceUnavailable,
+ http.StatusGatewayTimeout,
+ } {
+ statusCode = code
+ r.Run(strconv.Itoa(code), func() {
+ err := cat.CommitTransaction(context.Background(),
[]table.TableCommit{
+ {Identifier: table.Identifier{"db", "t1"},
Requirements: []table.Requirement{}, Updates: []table.Update{}},
+ })
+ r.Require().Error(err)
+ r.ErrorIs(err, rest.ErrCommitStateUnknown,
+ "%d should return ErrCommitStateUnknown, got:
%v", code, err)
+ })
+ }
+}
diff --git a/table/commit.go b/table/commit.go
new file mode 100644
index 00000000..98df8463
--- /dev/null
+++ b/table/commit.go
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import "errors"
+
+// TableCommit holds the identifier, requirements, and updates for a single
+// table within a multi-table transaction. It is used with
+// [catalog.TransactionalCatalog.CommitTransaction] to atomically commit
+// changes across multiple tables.
+type TableCommit struct {
+ Identifier Identifier
+ Requirements []Requirement
+ Updates []Update
+}
+
+// TableCommit returns a TableCommit representing the pending changes in this
+// transaction without actually committing them. This is intended for
+// multi-table transactions where several TableCommit values are collected
+// and submitted together via [catalog.TransactionalCatalog.CommitTransaction].
+//
+// Most callers should use [catalog.MultiTableTransaction] instead of calling
+// this method directly — it handles extraction, commit, and lifecycle
+// management automatically.
+//
+// The method automatically includes an AssertTableUUID requirement, matching
+// the behavior of [Transaction.Commit].
+//
+// TableCommit does not mark the transaction as committed — the caller is
+// responsible for either calling Commit (single-table) or submitting the
+// returned TableCommit via CommitTransaction (multi-table). After a
+// successful multi-table commit the caller should call MarkCommitted to
+// prevent accidental reuse.
+//
+// PostCommit hooks are NOT executed by this method. Because the multi-table
+// commit endpoint returns 204 No Content (no metadata), callers must
+// LoadTable after a successful CommitTransaction if they need updated state.
+func (t *Transaction) TableCommit() (TableCommit, error) {
+ t.mx.Lock()
+ defer t.mx.Unlock()
+
+ if t.committed {
+ return TableCommit{}, errors.New("transaction has already been
committed")
+ }
+
+ if len(t.meta.updates) == 0 {
+ return TableCommit{
+ Identifier: t.tbl.identifier,
+ Requirements: []Requirement{},
+ Updates: []Update{},
+ }, nil
+ }
+
+ reqs := make([]Requirement, len(t.reqs), len(t.reqs)+1)
+ copy(reqs, t.reqs)
+ reqs = append(reqs, AssertTableUUID(t.meta.uuid))
+
+ updates := make([]Update, len(t.meta.updates))
+ copy(updates, t.meta.updates)
+
+ return TableCommit{
+ Identifier: t.tbl.identifier,
+ Requirements: reqs,
+ Updates: updates,
+ }, nil
+}
+
+// MarkCommitted marks the transaction as committed, preventing further use.
+// This should be called after a successful multi-table commit via
+// [catalog.TransactionalCatalog.CommitTransaction].
+func (t *Transaction) MarkCommitted() {
+ t.mx.Lock()
+ defer t.mx.Unlock()
+
+ t.committed = true
+}
diff --git a/table/commit_test.go b/table/commit_test.go
new file mode 100644
index 00000000..eb4a3240
--- /dev/null
+++ b/table/commit_test.go
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table_test
+
+import (
+ "testing"
+
+ "github.com/apache/iceberg-go"
+ "github.com/apache/iceberg-go/table"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func newCommitTestTable(t *testing.T) *table.Table {
+ t.Helper()
+
+ schema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "data", Type:
iceberg.PrimitiveTypes.String, Required: false},
+ )
+
+ meta, err := table.NewMetadata(schema, iceberg.UnpartitionedSpec,
+ table.UnsortedSortOrder, "s3://bucket/test",
+ iceberg.Properties{table.PropertyFormatVersion: "2"})
+ require.NoError(t, err)
+
+ return table.New(
+ table.Identifier{"db", "test_table"},
+ meta, "s3://bucket/test/metadata/v1.metadata.json",
+ nil, nil,
+ )
+}
+
+func TestTableCommitFromEmptyTransaction(t *testing.T) {
+ tbl := newCommitTestTable(t)
+ tx := tbl.NewTransaction()
+
+ tc, err := tx.TableCommit()
+ require.NoError(t, err)
+
+ assert.Equal(t, table.Identifier{"db", "test_table"}, tc.Identifier)
+ assert.NotNil(t, tc.Requirements, "Requirements must be non-nil empty
slice for JSON serialization")
+ assert.NotNil(t, tc.Updates, "Updates must be non-nil empty slice for
JSON serialization")
+ assert.Empty(t, tc.Requirements)
+ assert.Empty(t, tc.Updates)
+}
+
+func TestTableCommitAfterCommitFails(t *testing.T) {
+ tbl := newCommitTestTable(t)
+ tx := tbl.NewTransaction()
+
+ tx.MarkCommitted()
+
+ _, err := tx.TableCommit()
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "already been committed")
+}
+
+func TestTableCommitIncludesAssertTableUUID(t *testing.T) {
+ tbl := newCommitTestTable(t)
+ tx := tbl.NewTransaction()
+
+ require.NoError(t, tx.SetProperties(map[string]string{"key": "value"}))
+
+ tc, err := tx.TableCommit()
+ require.NoError(t, err)
+
+ assert.NotEmpty(t, tc.Updates)
+
+ var hasUUIDAssert bool
+ for _, r := range tc.Requirements {
+ if r.GetType() == "assert-table-uuid" {
+ hasUUIDAssert = true
+
+ break
+ }
+ }
+ assert.True(t, hasUUIDAssert, "expected AssertTableUUID requirement")
+}
+
+func TestTableCommitDoesNotMarkTransactionCommitted(t *testing.T) {
+ tbl := newCommitTestTable(t)
+ tx := tbl.NewTransaction()
+
+ require.NoError(t, tx.SetProperties(map[string]string{"key": "value"}))
+
+ _, err := tx.TableCommit()
+ require.NoError(t, err)
+
+ // Second call should also succeed — not marked as committed
+ _, err = tx.TableCommit()
+ require.NoError(t, err)
+}
+
+func TestTableCommitReturnsCopies(t *testing.T) {
+ tbl := newCommitTestTable(t)
+ tx := tbl.NewTransaction()
+
+ require.NoError(t, tx.SetProperties(map[string]string{"key": "value"}))
+
+ tc1, err := tx.TableCommit()
+ require.NoError(t, err)
+
+ tc2, err := tx.TableCommit()
+ require.NoError(t, err)
+
+ // Mutating one should not affect the other
+ if len(tc1.Requirements) > 0 {
+ tc1.Requirements = tc1.Requirements[:0]
+ }
+ assert.NotEmpty(t, tc2.Requirements)
+}
+
+func TestMarkCommittedPreventsCommit(t *testing.T) {
+ tbl := newCommitTestTable(t)
+ tx := tbl.NewTransaction()
+
+ tx.MarkCommitted()
+
+ _, err := tx.Commit(t.Context())
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "already been committed")
+}