zeroshade commented on code in PR #925:
URL: https://github.com/apache/iceberg-go/pull/925#discussion_r3523598017
##########
catalog/sql/sql.go:
##########
@@ -1048,3 +1052,126 @@ func (c *Catalog) LoadView(ctx context.Context,
identifier table.Identifier) (vi
return loadedView.Metadata(), nil
}
+
+// CommitTransaction atomically commits changes to multiple tables in a
+// single database transaction. It implements [catalog.TransactionalCatalog].
+//
+// This method provides "commit-point atomicity". To avoid long-running DB
+// transactions involving I/O (like uploading metadata files), table state
+// is loaded and metadata files are written outside the write transaction.
+// Atomicity is guaranteed at the commit point within the database; concurrent
+// updates are detected using per-row optimistic concurrency control (OCC)
+// based on the metadata_location.
+//
+// If any table update fails due to an OCC conflict or database error, the
+// entire transaction is rolled back, ensuring all-or-nothing database updates.
+// Note that metadata files written during the preparation phase may remain
+// as orphaned files if the transaction is rolled back.
+//
+// On success the method returns nil. Callers must LoadTable individually
+// to obtain updated metadata.
+func (c *Catalog) CommitTransaction(ctx context.Context, commits
[]table.TableCommit) error {
+ if len(commits) == 0 {
+ return catalog.ErrEmptyCommitList
+ }
+
+ seen := make(map[string]struct{})
+
+ for _, commit := range commits {
+ if len(commit.Identifier) == 0 {
+ return catalog.ErrMissingIdentifier
+ }
+
+ key := strings.Join(commit.Identifier, ".")
+ if _, ok := seen[key]; ok {
+ return fmt.Errorf("duplicate table in commit list: %s",
key)
+ }
+ seen[key] = struct{}{}
+ }
+
+ // Phase 1: Load current state and stage all table updates.
+ // We do this outside the write transaction to minimize the time
+ // the DB transaction is held open.
+ type stagedCommit struct {
+ ident table.Identifier
+ current *table.Table
+ ns string
+ tblName string
+ staged *table.StagedTable
+ }
+
+ stagedCommits := make([]stagedCommit, 0, len(commits))
+ for _, commit := range commits {
+ ns := catalog.NamespaceFromIdent(commit.Identifier)
+ tblName := catalog.TableNameFromIdent(commit.Identifier)
+
+ current, err := c.LoadTable(ctx, commit.Identifier)
+ if err != nil {
+ return err
+ }
+
+ staged, err := internal.UpdateAndStageTable(ctx, current,
commit.Identifier, commit.Requirements, commit.Updates, c)
+ if err != nil {
+ return err
+ }
+
+ // Skip tables with no actual changes.
+ if current != nil &&
staged.Metadata().Equals(current.Metadata()) {
+ continue
+ }
+
+ // Write the metadata file before the DB transaction.
+ if err := internal.WriteMetadata(ctx, staged.Metadata(),
staged.MetadataLocation(), staged.Properties()); err != nil {
+ return err
+ }
+
+ stagedCommits = append(stagedCommits, stagedCommit{
+ ident: commit.Identifier,
+ current: current,
+ ns: strings.Join(ns, "."),
+ tblName: tblName,
+ staged: staged,
+ })
+ }
+
+ if len(stagedCommits) == 0 {
+ return nil // all tables had no changes
+ }
+
+ // Sort stagedCommits by identifier to prevent deadlocks in databases
with
+ // row-level locking (e.g., Postgres, MySQL) when multiple transactions
Review Comment:
This implementation now calls out Postgres/MySQL deadlock and row-locking
behavior, but the added transactional conflict/rollback coverage is
SQLite-only. Please add at least one Postgres or MySQL-backed conflict/rollback
test for these claims, or narrow the claims to the SQLite-backed behavior that
is currently covered.
##########
catalog/sql/sql.go:
##########
@@ -1048,3 +1052,126 @@ func (c *Catalog) LoadView(ctx context.Context,
identifier table.Identifier) (vi
return loadedView.Metadata(), nil
}
+
+// CommitTransaction atomically commits changes to multiple tables in a
+// single database transaction. It implements [catalog.TransactionalCatalog].
+//
+// This method provides "commit-point atomicity". To avoid long-running DB
+// transactions involving I/O (like uploading metadata files), table state
+// is loaded and metadata files are written outside the write transaction.
+// Atomicity is guaranteed at the commit point within the database; concurrent
+// updates are detected using per-row optimistic concurrency control (OCC)
+// based on the metadata_location.
+//
+// If any table update fails due to an OCC conflict or database error, the
+// entire transaction is rolled back, ensuring all-or-nothing database updates.
+// Note that metadata files written during the preparation phase may remain
+// as orphaned files if the transaction is rolled back.
+//
+// On success the method returns nil. Callers must LoadTable individually
+// to obtain updated metadata.
+func (c *Catalog) CommitTransaction(ctx context.Context, commits
[]table.TableCommit) error {
+ if len(commits) == 0 {
+ return catalog.ErrEmptyCommitList
+ }
+
+ seen := make(map[string]struct{})
+
+ for _, commit := range commits {
+ if len(commit.Identifier) == 0 {
+ return catalog.ErrMissingIdentifier
+ }
+
+ key := strings.Join(commit.Identifier, ".")
+ if _, ok := seen[key]; ok {
+ return fmt.Errorf("duplicate table in commit list: %s",
key)
+ }
+ seen[key] = struct{}{}
+ }
+
+ // Phase 1: Load current state and stage all table updates.
+ // We do this outside the write transaction to minimize the time
+ // the DB transaction is held open.
+ type stagedCommit struct {
+ ident table.Identifier
+ current *table.Table
+ ns string
+ tblName string
+ staged *table.StagedTable
+ }
+
+ stagedCommits := make([]stagedCommit, 0, len(commits))
+ for _, commit := range commits {
+ ns := catalog.NamespaceFromIdent(commit.Identifier)
+ tblName := catalog.TableNameFromIdent(commit.Identifier)
+
+ current, err := c.LoadTable(ctx, commit.Identifier)
+ if err != nil {
+ return err
+ }
+
+ staged, err := internal.UpdateAndStageTable(ctx, current,
commit.Identifier, commit.Requirements, commit.Updates, c)
Review Comment:
This still calls the old helper signature. Current main has
`internal.UpdateAndStageTable(ctx, catprops, current, ident, reqs, updates,
cat)`, so this will not compile after rebase. Please pass the catalog
properties and current commit fields, e.g. `internal.UpdateAndStageTable(ctx,
c.props, current, commit.Identifier, commit.Requirements, commit.Updates, c)`.
The metadata write just below has the same stale-signature issue and should use
`internal.WriteMetadata(ctx, staged.Table)`.
##########
catalog/catalog.go:
##########
@@ -62,6 +62,7 @@ var (
ErrViewAlreadyExists = errors.New("view already exists")
ErrEmptyCommitList = errors.New("commit list must not be empty")
ErrMissingIdentifier = errors.New("every table commit must have a
valid identifier")
+ ErrCommitFailed = errors.New("commit failed")
Review Comment:
Please do not add a separate catalog-level `ErrCommitFailed` sentinel.
Current main already uses the canonical retryable `table.ErrCommitFailed` for
the table-commit retry contract, including REST and other catalogs. SQL
multi-table conflicts should wrap `table.ErrCommitFailed` directly, and tests
should assert `errors.Is(err, table.ErrCommitFailed)`, otherwise callers will
not see the same retry/error semantics.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]