This is an automated email from the ASF dual-hosted git repository.

laskoviymishka pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 4bf2ece3 fix(occ): manifest list rebuild (#982)
4bf2ece3 is described below

commit 4bf2ece3cd0c6855546256ead0f16a7115da9a13
Author: Masa <[email protected]>
AuthorDate: Fri May 8 04:42:51 2026 +1000

    fix(occ): manifest list rebuild (#982)
    
    fix(occ): rebuild manifest list on OCC retry to inherit concurrent
    writes
    
    Fixes #976
    ## Problem
    
    When `doCommit` retries after an OCC conflict, it re-uses the stale
    snapshot that was built before the first attempt. That snapshot's
    manifest list was written against the original parent and does not
    include any data files committed by concurrent writers in the meantime.
    
    On catalogs that perform server-side snapshot validation (e.g. AWS S3
    Tables), this causes **silent data loss**: the retried commit succeeds
    (HTTP 200) but the stale manifest list effectively drops the concurrent
    writer's files from the table history.
    
    ## Fix
    
    Each `snapshotProducer` now records which manifests it wrote itself
    (`ownManifests` — those not inherited from the original parent). A
    `rebuildManifestList` closure is attached to the `addSnapshotUpdate` and
    called by `doCommit` on every retry. The closure:
    
    1. Loads the fresh branch head's manifest list (concurrent manifests).
    2. Concatenates `ownManifests` with the fresh inherited manifests.
    3. Writes a new manifest list file with a retry-attempt suffix so each
    attempt gets a unique path.
    4. Adjusts `sequence-number` relative to the fresh parent.
    
    After a successful commit, `doCommit` deletes the manifest list files
    produced by superseded retry attempts (orphan cleanup).
    
    ## Files changed
    
    - `table/snapshot_producers.go`: `computeOwnManifests`, `rebuildFn`
    closure
    - `table/updates.go`: `ownManifests` / `rebuildManifestList` fields on
    `addSnapshotUpdate`, propagated through `Apply`
    - `table/table.go`: `rebuildSnapshotUpdates` helper, orphan cleanup
    after commit
    - `table/rebuild_manifest_test.go`: unit tests for
    `rebuildSnapshotUpdates`
    
    ---------
    
    Signed-off-by: mzzz-zzm <[email protected]>
    Signed-off-by: masa hoashi <[email protected]>
    Co-authored-by: masa hoashi <[email protected]>
---
 table/add_snapshot_update_test.go       | 181 ++++++++++
 table/commit_retry_test.go              | 591 ++++++++++++++++++++++++++++++++
 table/metadata.go                       |  31 +-
 table/metadata_builder_internal_test.go |   4 +-
 table/occ_scenario_test.go              | 257 ++++++++++++++
 table/rebuild_manifest_test.go          | 515 ++++++++++++++++++++++++++++
 table/snapshot_producers.go             | 169 ++++++++-
 table/snapshot_producers_test.go        |  87 ++++-
 table/table.go                          | 128 ++++++-
 table/updates.go                        |  20 +-
 10 files changed, 1965 insertions(+), 18 deletions(-)

diff --git a/table/add_snapshot_update_test.go 
b/table/add_snapshot_update_test.go
new file mode 100644
index 00000000..616ef035
--- /dev/null
+++ b/table/add_snapshot_update_test.go
@@ -0,0 +1,181 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "context"
+       "testing"
+
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+// ---------------------------------------------------------------------------
+// addSnapshotUpdate.Apply / MetadataBuilder.AddSnapshotUpdate
+//
+// These tests pin the contract that runtime-only fields on *addSnapshotUpdate
+// (ownManifests, rebuildManifestList) survive Apply. The previous
+// implementation called builder.AddSnapshot(u.Snapshot), which constructed a
+// fresh *addSnapshotUpdate, then copied the runtime fields onto
+// builder.updates[n-1] via a back-door. That back-door silently broke if
+// MetadataBuilder.AddSnapshot ever appended a helper update after the
+// snapshot one, dropping the rebuild closure with no error or log — the
+// "silent fix can be bypassed" shape reviewer laskoviymishka flagged.
+//
+// The current implementation routes through MetadataBuilder.AddSnapshotUpdate,
+// which appends the supplied update verbatim. The tests below assert IDENTITY
+// (assert.Same), not just field equality, so a future regression that
+// reintroduces NewAddSnapshotUpdate(snapshot) inside AddSnapshotUpdate would
+// fail here.
+// ---------------------------------------------------------------------------
+
+// snapshotForApplyTest returns a v2-compatible Snapshot whose SequenceNumber
+// is greater than baseMetaJSON's last-sequence-number (1) so AddSnapshot's
+// monotonicity check passes.
+func snapshotForApplyTest(id int64) *Snapshot {
+       return &Snapshot{
+               SnapshotID:     id,
+               SequenceNumber: 2,
+               TimestampMs:    1_700_000_000_000,
+               ManifestList:   "s3://bucket/snap.avro",
+               Summary:        &Summary{Operation: OpAppend},
+       }
+}
+
+// findStoredSnapshotUpdate returns the last *addSnapshotUpdate appended to
+// builder.updates, or nil if none exists. Tests use this to inspect what was
+// stored after AddSnapshot / AddSnapshotUpdate / Apply.
+func findStoredSnapshotUpdate(b *MetadataBuilder) *addSnapshotUpdate {
+       for i := len(b.updates) - 1; i >= 0; i-- {
+               if su, ok := b.updates[i].(*addSnapshotUpdate); ok {
+                       return su
+               }
+       }
+
+       return nil
+}
+
+// TestAddSnapshotUpdate_PreservesIdentity verifies that AddSnapshotUpdate
+// stores the supplied *addSnapshotUpdate verbatim in builder.updates rather
+// than constructing a fresh one. Identity preservation is what makes the
+// back-door unnecessary; assert.Same is intentional — equality of fields
+// would mask a regression that constructs a fresh update with the same
+// values but different identity.
+func TestAddSnapshotUpdate_PreservesIdentity(t *testing.T) {
+       b := buildFromBase(t)
+       u := NewAddSnapshotUpdate(snapshotForApplyTest(100))
+
+       require.NoError(t, b.AddSnapshotUpdate(u))
+
+       stored := findStoredSnapshotUpdate(b)
+       require.NotNil(t, stored, "builder.updates must contain an 
*addSnapshotUpdate")
+       assert.Same(t, u, stored,
+               "AddSnapshotUpdate must store the supplied *addSnapshotUpdate 
verbatim; "+
+                       "constructing a fresh object reintroduces the back-door 
failure mode")
+}
+
+// TestAddSnapshotUpdate_PreservesRebuildClosure verifies that the rebuild
+// closure attached to the supplied update survives Apply. Without identity
+// preservation, the closure would be silently dropped — exactly the failure
+// mode reviewer laskoviymishka flagged on PR #982.
+func TestAddSnapshotUpdate_PreservesRebuildClosure(t *testing.T) {
+       b := buildFromBase(t)
+
+       called := false
+       u := NewAddSnapshotUpdate(snapshotForApplyTest(101))
+       u.rebuildManifestList = func(_ context.Context, _ Metadata, _ 
*Snapshot, _ iceio.WriteFileIO, _ int) (*Snapshot, error) {
+               called = true
+
+               return u.Snapshot, nil
+       }
+       u.ownManifests = []iceberg.ManifestFile{}
+
+       require.NoError(t, b.AddSnapshotUpdate(u))
+
+       stored := findStoredSnapshotUpdate(b)
+       require.NotNil(t, stored)
+       require.Same(t, u, stored, "stored update must be the supplied 
instance, not a copy")
+       require.NotNil(t, stored.rebuildManifestList,
+               "rebuildManifestList closure must survive Apply; dropping it 
silently disables the OCC rebuild path")
+       assert.NotNil(t, stored.ownManifests, "ownManifests slice must survive 
Apply")
+
+       // Invoke the closure to prove it is the original function value, not
+       // a re-zeroed nil-equivalent that happens to compile.
+       _, err := stored.rebuildManifestList(t.Context(), nil, nil, nil, 0)
+       require.NoError(t, err)
+       assert.True(t, called, "stored closure must be the original; a 
fresh-object regression would lose it")
+}
+
+// TestAddSnapshotUpdate_ApplyRoutesThroughAddSnapshotUpdate verifies that the
+// Update interface entry point (Apply) preserves identity. This is the path
+// exercised by Transaction.Commit -> u.Apply(t.meta), so a regression here
+// would break the production OCC retry path.
+func TestAddSnapshotUpdate_ApplyRoutesThroughAddSnapshotUpdate(t *testing.T) {
+       b := buildFromBase(t)
+
+       u := NewAddSnapshotUpdate(snapshotForApplyTest(102))
+       u.rebuildManifestList = func(_ context.Context, _ Metadata, _ 
*Snapshot, _ iceio.WriteFileIO, _ int) (*Snapshot, error) {
+               return u.Snapshot, nil
+       }
+
+       require.NoError(t, u.Apply(b))
+
+       stored := findStoredSnapshotUpdate(b)
+       require.NotNil(t, stored)
+       assert.Same(t, u, stored,
+               "Apply must route through AddSnapshotUpdate so identity (and 
runtime fields) is preserved")
+       require.NotNil(t, stored.rebuildManifestList,
+               "rebuildManifestList must survive Apply via the Update 
interface")
+}
+
+// TestAddSnapshotUpdate_NilNoOp verifies defensive handling: a nil update or
+// a nil-Snapshot update is a no-op rather than a panic. This mirrors the
+// behavior of AddSnapshot(nil) which existed before this refactor.
+func TestAddSnapshotUpdate_NilNoOp(t *testing.T) {
+       b := buildFromBase(t)
+       updatesBefore := len(b.updates)
+
+       require.NoError(t, b.AddSnapshotUpdate(nil),
+               "nil *addSnapshotUpdate must be a no-op")
+       require.NoError(t, b.AddSnapshotUpdate(&addSnapshotUpdate{}),
+               "update with nil Snapshot must be a no-op (matches 
AddSnapshot(nil))")
+
+       assert.Equal(t, updatesBefore, len(b.updates),
+               "no-op calls must not append to builder.updates")
+}
+
+// TestAddSnapshot_PreservedForNonRebuildCallers verifies that the existing
+// AddSnapshot(*Snapshot) entry point — used by ~30 call sites across the
+// codebase — continues to work unchanged after the refactor. Any breakage
+// here would be a wide regression.
+func TestAddSnapshot_PreservedForNonRebuildCallers(t *testing.T) {
+       b := buildFromBase(t)
+       snap := snapshotForApplyTest(103)
+
+       require.NoError(t, b.AddSnapshot(snap))
+
+       // The fresh path constructs an *addSnapshotUpdate internally, so the
+       // stored update must NOT have rebuildManifestList set.
+       stored := findStoredSnapshotUpdate(b)
+       require.NotNil(t, stored)
+       assert.Equal(t, snap.SnapshotID, stored.Snapshot.SnapshotID)
+       assert.Nil(t, stored.rebuildManifestList,
+               "AddSnapshot path must produce updates with no rebuild closure")
+}
diff --git a/table/commit_retry_test.go b/table/commit_retry_test.go
index ab976a12..3d93ae67 100644
--- a/table/commit_retry_test.go
+++ b/table/commit_retry_test.go
@@ -22,6 +22,8 @@ import (
        "errors"
        "fmt"
        "path/filepath"
+       "strconv"
+       "strings"
        "sync/atomic"
        "testing"
        "time"
@@ -32,6 +34,47 @@ import (
        "github.com/stretchr/testify/require"
 )
 
+// readOnlyIO implements iceio.IO but NOT iceio.WriteFileIO.
+// Used to verify that doCommit fails early when the FS cannot write.
+type readOnlyIO struct{}
+
+func (readOnlyIO) Open(_ string) (iceio.File, error) { return nil, 
errors.New("readOnlyIO: no files") }
+func (readOnlyIO) Remove(_ string) error             { return 
errors.New("readOnlyIO: read-only") }
+
+// sequentialCatalog returns a predetermined error per CommitTable attempt.
+// If attempts exceed the len(errs) slice it returns nil (success).
+// When loadMeta is set, LoadTable returns that metadata instead of c.metadata.
+type sequentialCatalog struct {
+       metadata Metadata
+       loadMeta Metadata // optional: returned by LoadTable if non-nil
+       errs     []error
+       attempts atomic.Int32
+}
+
+func (c *sequentialCatalog) LoadTable(_ context.Context, ident Identifier) 
(*Table, error) {
+       m := c.metadata
+       if c.loadMeta != nil {
+               m = c.loadMeta
+       }
+
+       return New(ident, m, "",
+               func(context.Context) (iceio.IO, error) { return 
iceio.LocalFS{}, nil }, c), nil
+}
+
+func (c *sequentialCatalog) CommitTable(_ context.Context, _ Identifier, _ 
[]Requirement, updates []Update) (Metadata, string, error) {
+       n := int(c.attempts.Add(1)) - 1 // 0-indexed
+       if n < len(c.errs) && c.errs[n] != nil {
+               return nil, "", c.errs[n]
+       }
+       meta, err := UpdateTableMetadata(c.metadata, updates, "")
+       if err != nil {
+               return nil, "", err
+       }
+       c.metadata = meta
+
+       return meta, "", nil
+}
+
 // flakyCatalog commits successfully only on a specified attempt number.
 // Earlier attempts return the given error.
 type flakyCatalog struct {
@@ -266,3 +309,551 @@ func TestReadRetryConfig_ClampsNegativeProperties(t 
*testing.T) {
        assert.Equal(t, uint(CommitMaxRetryWaitMsDefault), cfg.maxWaitMs)
        assert.Equal(t, uint(CommitTotalRetryTimeoutMsDefault), 
cfg.totalTimeoutMs)
 }
+
+// ---------------------------------------------------------------------------
+// Fix 5 — mandatory WriteFileIO check at top of doCommit
+// ---------------------------------------------------------------------------
+
+// TestDoCommit_NonWriteFileIOReturnsError verifies that doCommit fails
+// immediately when the table's file system does not implement WriteFileIO.
+// A silent skip would reuse the stale manifest list — exactly the bug
+// this PR was designed to fix.
+func TestDoCommit_NonWriteFileIOReturnsError(t *testing.T) {
+       cat := &flakyCatalog{}
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+       )
+       meta, err := NewMetadata(schema, iceberg.UnpartitionedSpec, 
UnsortedSortOrder, "file:///tmp/rotest",
+               iceberg.Properties{PropertyFormatVersion: "2"})
+       require.NoError(t, err)
+       cat.metadata = meta
+
+       tbl := New(
+               Identifier{"db", "ro-test"},
+               meta, "file:///tmp/rotest/v1.metadata.json",
+               func(context.Context) (iceio.IO, error) { return readOnlyIO{}, 
nil },
+               cat,
+       )
+
+       _, doErr := tbl.doCommit(t.Context(), nil, nil)
+       require.Error(t, doErr, "doCommit must fail when FS does not implement 
WriteFileIO")
+       require.ErrorIs(t, doErr, ErrWriteIORequired,
+               "doCommit must wrap ErrWriteIORequired so callers can detect 
this condition with errors.Is")
+       assert.Equal(t, int32(0), cat.attempts.Load(),
+               "CommitTable must not be called when FS check fails")
+}
+
+// ---------------------------------------------------------------------------
+// Fix 6 — orphan cleanup via defer
+// ---------------------------------------------------------------------------
+
+// newOCCTable creates a table that uses the given wfs for its FS and the given
+// catalog for commits. meta should include retry-config properties so that
+// doCommit's retry loop allows at least one retry.
+func newOCCTable(t *testing.T, meta Metadata, wfs iceio.WriteFileIO, cat 
CatalogIO) *Table {
+       t.Helper()
+
+       return New(
+               Identifier{"db", "occ-cleanup-test"},
+               meta,
+               "mem://default/table-location/metadata/v1.metadata.json",
+               func(context.Context) (iceio.IO, error) { return wfs, nil },
+               cat,
+       )
+}
+
+// newMemIOWithRetryMeta creates a test memIO and a matching table Metadata 
that
+// includes retry-config properties, so doCommit's retry loop allows retries.
+// The location matches createTestTransactionWithMemIO so they share the same
+// memIO for writing manifest files.
+func newMemIOWithRetryMeta(t *testing.T, spec iceberg.PartitionSpec) (*memIO, 
Metadata) {
+       t.Helper()
+       wfs := newMemIO(1<<20, nil)
+       schema := simpleSchema()
+       meta, err := NewMetadata(schema, &spec, UnsortedSortOrder, 
"mem://default/table-location",
+               iceberg.Properties{
+                       CommitNumRetriesKey:          "3",
+                       CommitMinRetryWaitMsKey:      "1",
+                       CommitMaxRetryWaitMsKey:      "2",
+                       CommitTotalRetryTimeoutMsKey: "60000",
+               })
+       require.NoError(t, err, "new metadata")
+
+       return wfs, meta
+}
+
+// TestDoCommit_OrphanCleanedOnSuccess verifies that manifest-list files
+// orphaned by OCC retries are removed after a successful commit. These files
+// are written during rebuild and must not leak on the happy path.
+func TestDoCommit_OrphanCleanedOnSuccess(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       wfs, meta := newMemIOWithRetryMeta(t, spec)
+
+       // Build a transaction from the retry-enabled meta and commit via the 
producer.
+       tbl := newOCCTable(t, meta, wfs, nil)
+       txn := tbl.NewTransaction()
+       sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
+       sp.appendDataFile(newTestDataFile(t, spec, 
"mem://default/table-location/data/f.parquet", nil))
+
+       updates, reqs, err := sp.commit(context.Background())
+       require.NoError(t, err)
+       addSnap := updates[0].(*addSnapshotUpdate)
+       originalManifestList := addSnap.Snapshot.ManifestList
+
+       // R3: assert the producer actually wrote the manifest list (real flow,
+       // not pre-populated placeholder). The orphan/cleanup contract is only
+       // meaningful when there is a real file to clean up.
+       require.Contains(t, wfs.files, originalManifestList,
+               "producer.commit() must persist the manifest list via 
WriteFileIO before doCommit runs")
+
+       // Catalog: fail once with ErrCommitFailed (triggers rebuild that 
orphans
+       // originalManifestList), then succeed.
+       cat := &sequentialCatalog{
+               metadata: meta,
+               errs:     []error{ErrCommitFailed},
+       }
+       tbl = newOCCTable(t, meta, wfs, cat)
+
+       _, err = tbl.doCommit(t.Context(), updates, reqs, 
withCommitBranch(MainBranch))
+       require.NoError(t, err, "doCommit must succeed on the second attempt")
+
+       _, stillExists := wfs.files[originalManifestList]
+       assert.False(t, stillExists,
+               "orphaned manifest list must be removed after successful 
commit")
+
+       // R3 (latest review): prove the LIVE committed path was preserved, not
+       // just that something got deleted. The catalog's CurrentSnapshot is the
+       // rebuilt one; its ManifestList must (a) differ from the orphan, and
+       // (b) still be present in the filesystem. A regression that flipped the
+       // cleanup loop to delete the committed path instead of the orphan would
+       // fail (b).
+       committedSnap := cat.metadata.CurrentSnapshot()
+       require.NotNil(t, committedSnap, "catalog must record the committed 
snapshot")
+       committedManifestList := committedSnap.ManifestList
+       require.NotEqual(t, originalManifestList, committedManifestList,
+               "committed manifest list must be the rebuilt path, not the 
original")
+       require.Contains(t, wfs.files, committedManifestList,
+               "live committed manifest list must be preserved by the cleanup 
defer")
+}
+
+// TestDoCommit_OrphanNotCleanedOnUnknownError verifies that manifest-list
+// files are NOT removed when CommitTable returns an unknown 
non-ErrCommitFailed
+// error (5xx / gateway timeout). In that case the catalog may have silently
+// accepted the commit, meaning one of the "orphaned" files is actually the
+// live snapshot. Deleting it would permanently corrupt the table.
+func TestDoCommit_OrphanNotCleanedOnUnknownError(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       wfs, meta := newMemIOWithRetryMeta(t, spec)
+
+       tbl := newOCCTable(t, meta, wfs, nil)
+       txn := tbl.NewTransaction()
+       sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
+       sp.appendDataFile(newTestDataFile(t, spec, 
"mem://default/table-location/data/f.parquet", nil))
+
+       updates, reqs, err := sp.commit(context.Background())
+       require.NoError(t, err)
+       addSnap := updates[0].(*addSnapshotUpdate)
+       originalManifestList := addSnap.Snapshot.ManifestList
+
+       require.Contains(t, wfs.files, originalManifestList,
+               "producer.commit() must persist the manifest list via 
WriteFileIO before doCommit runs")
+
+       unknown5xxErr := errors.New("simulated 5xx: internal server error")
+       // Catalog: fail once (ErrCommitFailed → rebuild → orphan created),
+       // then return a 5xx (non-ErrCommitFailed → cleanupOrphans=false).
+       cat := &sequentialCatalog{
+               metadata: meta,
+               errs:     []error{ErrCommitFailed, unknown5xxErr},
+       }
+       tbl = newOCCTable(t, meta, wfs, cat)
+
+       _, err = tbl.doCommit(t.Context(), updates, reqs, 
withCommitBranch(MainBranch))
+       require.ErrorIs(t, err, unknown5xxErr, "5xx error must propagate")
+
+       _, stillExists := wfs.files[originalManifestList]
+       assert.True(t, stillExists,
+               "orphaned manifest list must NOT be removed when commit outcome 
is unknown (5xx)")
+
+       // R3 (latest review): on unknown-state, EVERY rebuild output must be
+       // preserved — any of them might be the snapshot the catalog silently
+       // accepted. Walk wfs.files and assert there are >=2 distinct 
manifest-list
+       // entries (the original + at least one rebuild). A regression that
+       // flipped cleanupOrphans=true on this path would leave only the 
original.
+       manifestListCount := 0
+       for name := range wfs.files {
+               if strings.HasSuffix(name, ".avro") && strings.Contains(name, 
"snap-") {
+                       manifestListCount++
+               }
+       }
+       require.GreaterOrEqual(t, manifestListCount, 2,
+               "on unknown 5xx every rebuild's manifest list must be preserved 
(one may be the live commit)")
+}
+
+// TestDoCommit_OrphanCleanedOnCommitDiverged verifies that manifest-list files
+// orphaned by rebuild attempts are removed when ErrCommitDiverged is returned
+// by a conflict validator. Diverged commits are terminal (no retry), and since
+// neither of the orphaned files was ever accepted by the catalog, they are 
safe
+// to delete. The defer cleanup runs with cleanupOrphans=true on this path.
+func TestDoCommit_OrphanCleanedOnCommitDiverged(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       wfs, meta := newMemIOWithRetryMeta(t, spec)
+
+       // freshMeta has a snapshot on MainBranch so validators run on the 
retry attempt.
+       freshID := int64(42)
+       freshMeta := newConflictTestMetadataWithProps(t, &freshID, 
iceberg.Properties{
+               CommitNumRetriesKey:          "3",
+               CommitMinRetryWaitMsKey:      "1",
+               CommitMaxRetryWaitMsKey:      "2",
+               CommitTotalRetryTimeoutMsKey: "60000",
+       })
+
+       tbl := newOCCTable(t, meta, wfs, nil)
+       txn := tbl.NewTransaction()
+       sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
+       sp.appendDataFile(newTestDataFile(t, spec, 
"mem://default/table-location/data/f.parquet", nil))
+
+       updates, reqs, err := sp.commit(context.Background())
+       require.NoError(t, err)
+       addSnap := updates[0].(*addSnapshotUpdate)
+       originalManifestList := addSnap.Snapshot.ManifestList
+
+       require.Contains(t, wfs.files, originalManifestList,
+               "producer.commit() must persist the manifest list via 
WriteFileIO before doCommit runs")
+
+       // Catalog: fail once (ErrCommitFailed → rebuild → orphan created).
+       // LoadTable returns freshMeta (has branch snapshot → validators run on 
retry).
+       // Validator returns ErrCommitDiverged immediately.
+       cat := &sequentialCatalog{
+               metadata: meta,
+               loadMeta: freshMeta,
+               errs:     []error{ErrCommitFailed},
+       }
+       tbl = newOCCTable(t, meta, wfs, cat)
+
+       divergedValidator := func(*conflictContext) error { return 
ErrCommitDiverged }
+
+       _, err = tbl.doCommit(t.Context(), updates, reqs,
+               withCommitBranch(MainBranch),
+               withCommitValidators(divergedValidator),
+       )
+       require.ErrorIs(t, err, ErrCommitDiverged)
+
+       // The defer must have fired with cleanupOrphans=true and removed the 
orphan.
+       _, stillExists := wfs.files[originalManifestList]
+       assert.False(t, stillExists,
+               "orphaned manifest list must be removed even on 
ErrCommitDiverged: "+
+                       "the file was never accepted by the catalog so it is 
safe to delete")
+}
+
+// TestDoCommit_OrphanCleanedOnRetriesExhausted verifies that when every retry
+// attempt fails with ErrCommitFailed and the loop exits with the budget
+// exhausted, the defer still fires with cleanupOrphans=true. None of the
+// orphaned manifest-list files were ever accepted by the catalog, so they are
+// safe to delete on this terminal exit.
+func TestDoCommit_OrphanCleanedOnRetriesExhausted(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       wfs, meta := newMemIOWithRetryMeta(t, spec)
+
+       tbl := newOCCTable(t, meta, wfs, nil)
+       txn := tbl.NewTransaction()
+       sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
+       sp.appendDataFile(newTestDataFile(t, spec, 
"mem://default/table-location/data/f.parquet", nil))
+
+       updates, reqs, err := sp.commit(context.Background())
+       require.NoError(t, err)
+       addSnap := updates[0].(*addSnapshotUpdate)
+       originalManifestList := addSnap.Snapshot.ManifestList
+
+       require.Contains(t, wfs.files, originalManifestList,
+               "producer.commit() must persist the manifest list via 
WriteFileIO before doCommit runs")
+
+       // numRetries=3 → 4 attempts; every attempt fails with ErrCommitFailed.
+       cat := &sequentialCatalog{
+               metadata: meta,
+               errs:     []error{ErrCommitFailed, ErrCommitFailed, 
ErrCommitFailed, ErrCommitFailed},
+       }
+       tbl = newOCCTable(t, meta, wfs, cat)
+
+       _, err = tbl.doCommit(t.Context(), updates, reqs, 
withCommitBranch(MainBranch))
+       require.ErrorIs(t, err, ErrCommitFailed,
+               "retries exhausted: terminal error must be ErrCommitFailed")
+
+       _, stillExists := wfs.files[originalManifestList]
+       assert.False(t, stillExists,
+               "orphaned manifest list must be removed when retries are 
exhausted with ErrCommitFailed: "+
+                       "none of the retry attempts were accepted, so all 
orphans are safe to delete")
+}
+
+// ---------------------------------------------------------------------------
+// Fix 7 (R4) — retry-progression: every fix exercised end-to-end
+// ---------------------------------------------------------------------------
+
+// progressingRebuildCatalog grafts a real peer commit (with a readable
+// manifest-list file in wfs.files) onto its tracked metadata between every
+// failed retry, then accepts on the final attempt. Each peer commit:
+//
+//   - advances LastSequenceNumber and the branch head,
+//   - publishes a Summary increment ("total-data-files"+1, etc.) so the
+//     next retry's rebuild closure observes an evolving freshParent.Summary,
+//   - writes a real (empty) manifest-list file at a unique path so
+//     freshParent.Manifests(fio) succeeds inside rebuildFn.
+//
+// This is the helper reviewer R4 asked for: a single test that drives ≥2
+// ErrCommitFailed retries with a freshMeta that progresses between attempts,
+// so the rebuild closure is exercised for real (not just the retry-loop
+// refresh path with nil updates). The existing headTrackingCatalog cannot
+// do this — it only advances metadata on a SUCCESSFUL CommitTable call.
+type progressingRebuildCatalog struct {
+       metadata  Metadata
+       wfs       *memIO
+       location  string
+       branch    string
+       failTimes int
+
+       commitTableCalls atomic.Int32
+
+       // Per-call captures (used by assertions).
+       loadedLastSeqNums     []int64  // freshMeta.LastSequenceNumber() at 
each LoadTable
+       observedManifestLists []string // ManifestList path submitted at each 
CommitTable
+       observedSeqNums       []int64  // SequenceNumber submitted at each 
CommitTable
+
+       committedSnapshot *Snapshot // populated when CommitTable accepts
+       peerCount         int       // running peer-commit counter (= 
total-data-files contributed by peers)
+}
+
+func (c *progressingRebuildCatalog) LoadTable(_ context.Context, ident 
Identifier) (*Table, error) {
+       c.loadedLastSeqNums = append(c.loadedLastSeqNums, 
c.metadata.LastSequenceNumber())
+
+       return New(ident, c.metadata, "",
+               func(context.Context) (iceio.IO, error) { return c.wfs, nil }, 
c), nil
+}
+
+func (c *progressingRebuildCatalog) CommitTable(_ context.Context, _ 
Identifier, _ []Requirement, updates []Update) (Metadata, string, error) {
+       n := c.commitTableCalls.Add(1)
+
+       // Capture the manifest-list path and sequence number being submitted
+       // this attempt — the test asserts each retry produces a unique path
+       // and a strictly advancing seq number.
+       for _, u := range updates {
+               su, ok := u.(*addSnapshotUpdate)
+               if !ok {
+                       continue
+               }
+               c.observedManifestLists = append(c.observedManifestLists, 
su.Snapshot.ManifestList)
+               c.observedSeqNums = append(c.observedSeqNums, 
su.Snapshot.SequenceNumber)
+
+               break
+       }
+
+       if int(n) <= c.failTimes {
+               c.graftPeer()
+
+               return nil, "", ErrCommitFailed
+       }
+
+       // Accept: capture the submitted snapshot for assertions and apply
+       // the updates so c.metadata.CurrentSnapshot() reflects the commit.
+       for _, u := range updates {
+               if su, ok := u.(*addSnapshotUpdate); ok {
+                       snapCopy := *su.Snapshot
+                       c.committedSnapshot = &snapCopy
+
+                       break
+               }
+       }
+       builder, err := MetadataBuilderFromBase(c.metadata, "")
+       if err != nil {
+               return nil, "", err
+       }
+       for _, u := range updates {
+               if applyErr := u.Apply(builder); applyErr != nil {
+                       return nil, "", applyErr
+               }
+       }
+       out, err := builder.Build()
+       if err != nil {
+               return nil, "", err
+       }
+       c.metadata = out
+
+       return out, "", nil
+}
+
+// graftPeer adds a peer snapshot (with a real, readable empty manifest list)
+// to c.metadata and advances LastSequenceNumber. The peer's Summary contains
+// "total-data-files" = c.peerCount (cumulative), so the producer's rebuild
+// closure observes summary chaining when it rebases against this freshParent.
+func (c *progressingRebuildCatalog) graftPeer() {
+       c.peerCount++
+       peerID := int64(9_000) + int64(c.peerCount)
+       peerSeq := c.metadata.LastSequenceNumber() + 1
+
+       manifestListPath := 
fmt.Sprintf("%s/metadata/peer-%d-manifest-list.avro", c.location, peerID)
+       out, err := c.wfs.Create(manifestListPath)
+       if err != nil {
+               panic(err)
+       }
+       if writeErr := iceberg.WriteManifestList(2, out, peerID, nil, &peerSeq, 
0, nil); writeErr != nil {
+               panic(writeErr)
+       }
+       if closeErr := out.Close(); closeErr != nil {
+               panic(closeErr)
+       }
+
+       parent := c.metadata.SnapshotByName(c.branch)
+       var parentID *int64
+       if parent != nil {
+               id := parent.SnapshotID
+               parentID = &id
+       }
+
+       builder, err := MetadataBuilderFromBase(c.metadata, "")
+       if err != nil {
+               panic(err)
+       }
+       count := strconv.Itoa(c.peerCount)
+       if addErr := builder.AddSnapshot(&Snapshot{
+               SnapshotID:       peerID,
+               ParentSnapshotID: parentID,
+               SequenceNumber:   peerSeq,
+               TimestampMs:      c.metadata.LastUpdatedMillis() + 1,
+               ManifestList:     manifestListPath,
+               Summary: &Summary{
+                       Operation: OpAppend,
+                       Properties: iceberg.Properties{
+                               "total-data-files":       count,
+                               "total-records":          count,
+                               "total-files-size":       count,
+                               "total-delete-files":     "0",
+                               "total-position-deletes": "0",
+                               "total-equality-deletes": "0",
+                       },
+               },
+       }); addErr != nil {
+               panic(addErr)
+       }
+       if refErr := builder.SetSnapshotRef(c.branch, peerID, BranchRef); 
refErr != nil {
+               panic(refErr)
+       }
+       out2, err := builder.Build()
+       if err != nil {
+               panic(err)
+       }
+       c.metadata = out2
+}
+
+// TestDoCommit_RetryProgressesFreshMeta is the end-to-end regression test
+// for reviewer R4. It drives doCommit through ≥2 ErrCommitFailed retries
+// with a freshMeta that progresses between attempts, and asserts every
+// PR fix is exercised at once:
+//
+//   - Fix 3 (newSeq from freshMeta): the rebuilt SequenceNumber observed
+//     on each retry strictly advances and matches 
freshMeta.LastSequenceNumber+1.
+//   - Fix 2 (summary chained against fresh parent): the final committed
+//     summary's total-data-files == peerCount + this producer's adds.
+//   - Fix 6 (orphan defer cleanup): the original (attempt-0) and intermediate
+//     rebuild manifest-list paths are deleted; the live committed path is
+//     preserved in wfs.files.
+//   - "Orphan list grows by N": each retry submits a DISTINCT ManifestList
+//     path (proves the rebuild closure rewrote the list every attempt).
+//
+// A regression that cached freshMeta across retries, skipped the rebuild,
+// or carried over the captured-attempt-0 summary would fail one or more
+// of these assertions.
+func TestDoCommit_RetryProgressesFreshMeta(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       wfs, meta := newMemIOWithRetryMeta(t, spec)
+
+       // Build a real producer commit (with a real rebuildManifestList 
closure).
+       tbl := newOCCTable(t, meta, wfs, nil)
+       txn := tbl.NewTransaction()
+       sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
+       sp.appendDataFile(newTestDataFile(t, spec, 
"mem://default/table-location/data/f.parquet", nil))
+
+       updates, reqs, err := sp.commit(context.Background())
+       require.NoError(t, err)
+       addSnap := updates[0].(*addSnapshotUpdate)
+       originalManifestList := addSnap.Snapshot.ManifestList
+       require.Contains(t, wfs.files, originalManifestList,
+               "producer must persist the attempt-0 manifest list before 
doCommit runs")
+
+       cat := &progressingRebuildCatalog{
+               metadata:  meta,
+               wfs:       wfs,
+               location:  "mem://default/table-location",
+               branch:    MainBranch,
+               failTimes: 2, // 2 fails + 1 success = 3 CommitTable calls
+       }
+       tbl = newOCCTable(t, meta, wfs, cat)
+
+       _, err = tbl.doCommit(context.Background(), updates, reqs, 
withCommitBranch(MainBranch))
+       require.NoError(t, err, "doCommit must succeed on the third attempt")
+
+       // (a) commitTableCalls: 2 fails + 1 success.
+       require.Equal(t, int32(3), cat.commitTableCalls.Load(),
+               "failTimes=2 → 2 ErrCommitFailed + 1 success = 3 CommitTable 
calls")
+
+       // (b) Orphan list grows by N: every retry's submitted ManifestList path
+       // must be DISTINCT. The first equals the producer's original; attempts
+       // 2 and 3 are rebuilt paths from the rebuild closure.
+       require.Len(t, cat.observedManifestLists, 3)
+       seen := make(map[string]struct{}, 3)
+       for i, p := range cat.observedManifestLists {
+               require.NotEmpty(t, p, "attempt %d submitted an empty manifest 
list path", i)
+               _, dup := seen[p]
+               require.False(t, dup,
+                       "every retry must submit a UNIQUE manifest list path 
(orphan list grows by N): %v",
+                       cat.observedManifestLists)
+               seen[p] = struct{}{}
+       }
+       require.Equal(t, originalManifestList, cat.observedManifestLists[0],
+               "attempt 0 must submit the producer's original manifest list")
+
+       // (c) Each rebuild observed a different LastSequenceNumber. LoadTable
+       // is called once per retry (attempts 1 and 2 here — attempt 0 uses
+       // the writer's own base), so we expect 2 entries that strictly advance.
+       require.Len(t, cat.loadedLastSeqNums, 2,
+               "LoadTable must be called once per retry attempt (=2)")
+       require.Greater(t, cat.loadedLastSeqNums[1], cat.loadedLastSeqNums[0],
+               "freshMeta.LastSequenceNumber() must strictly advance across 
retries: %v",
+               cat.loadedLastSeqNums)
+
+       // (d) Fix 3 — newSeq derives from freshMeta.LastSequenceNumber()+1 on
+       // each retry. Submitted seq numbers must strictly increase across the
+       // 3 attempts, AND attempts 1 and 2 (the rebuilds) must equal the
+       // freshMeta.LastSequenceNumber observed by their respective LoadTable.
+       require.Len(t, cat.observedSeqNums, 3)
+       require.Greater(t, cat.observedSeqNums[1], cat.observedSeqNums[0],
+               "rebuild attempt 1 must submit a strictly higher seq num than 
attempt 0: %v",
+               cat.observedSeqNums)
+       require.Greater(t, cat.observedSeqNums[2], cat.observedSeqNums[1],
+               "rebuild attempt 2 must submit a strictly higher seq num than 
attempt 1: %v",
+               cat.observedSeqNums)
+       require.Equal(t, cat.loadedLastSeqNums[0]+1, cat.observedSeqNums[1],
+               "attempt-1 rebuild seq must equal 
freshMeta.LastSequenceNumber()+1 from its LoadTable")
+       require.Equal(t, cat.loadedLastSeqNums[1]+1, cat.observedSeqNums[2],
+               "attempt-2 rebuild seq must equal 
freshMeta.LastSequenceNumber()+1 from its LoadTable")
+
+       // (e) Fix 2 — summary chains on top of an evolving freshParent.
+       // 2 peer commits (each contributing total-data-files=1, then =2) +
+       // this producer's 1 file → final total-data-files == 3.
+       require.NotNil(t, cat.committedSnapshot, "committed snapshot must be 
captured")
+       require.NotNil(t, cat.committedSnapshot.Summary)
+       gotDataFiles := 
cat.committedSnapshot.Summary.Properties.GetInt("total-data-files", -1)
+       require.Equal(t, 3, gotDataFiles,
+               "committed total-data-files must = peer cumulative (2) + 
producer (1) = 3; "+
+                       "a regression that ignored freshParent.Summary would 
publish 1")
+
+       // (f) Fix 6 — defer cleanup: the live committed manifest list is
+       // preserved in wfs.files; the original and intermediate rebuilds
+       // were orphans and have been removed.
+       committedML := cat.committedSnapshot.ManifestList
+       require.Equal(t, cat.observedManifestLists[2], committedML,
+               "the third (accepted) submission must be the live committed 
manifest list")
+       require.Contains(t, wfs.files, committedML,
+               "live committed manifest list must be preserved by the cleanup 
defer")
+       require.NotContains(t, wfs.files, originalManifestList,
+               "attempt-0 manifest list must be cleaned as orphan after 
success")
+       require.NotContains(t, wfs.files, cat.observedManifestLists[1],
+               "attempt-1 rebuild manifest list must be cleaned as orphan 
after success")
+}
diff --git a/table/metadata.go b/table/metadata.go
index 61aca395..e6a06d4a 100644
--- a/table/metadata.go
+++ b/table/metadata.go
@@ -416,6 +416,29 @@ func (b *MetadataBuilder) AddPartitionSpec(spec 
*iceberg.PartitionSpec, initial
 }
 
 func (b *MetadataBuilder) AddSnapshot(snapshot *Snapshot) error {
+       return b.addSnapshotInternal(snapshot, nil)
+}
+
+// AddSnapshotUpdate adds a snapshot to the builder and stores the supplied
+// *addSnapshotUpdate as the corresponding entry in builder.updates, preserving
+// runtime-only fields (such as the manifest-list rebuild closure used by the
+// OCC retry path) that would be lost if a fresh update object were 
constructed.
+//
+// Callers without runtime-only fields should keep using AddSnapshot, which
+// constructs a default *addSnapshotUpdate internally.
+func (b *MetadataBuilder) AddSnapshotUpdate(u *addSnapshotUpdate) error {
+       if u == nil {
+               return nil
+       }
+
+       return b.addSnapshotInternal(u.Snapshot, u)
+}
+
+// addSnapshotInternal contains the shared validation and bookkeeping for both
+// AddSnapshot and AddSnapshotUpdate. When preserveUpdate is non-nil it is
+// appended to b.updates verbatim; otherwise a fresh *addSnapshotUpdate is
+// created via NewAddSnapshotUpdate.
+func (b *MetadataBuilder) addSnapshotInternal(snapshot *Snapshot, 
preserveUpdate *addSnapshotUpdate) error {
        if snapshot == nil {
                return nil
        }
@@ -450,7 +473,11 @@ func (b *MetadataBuilder) AddSnapshot(snapshot *Snapshot) 
error {
                return err
        }
 
-       b.updates = append(b.updates, NewAddSnapshotUpdate(snapshot))
+       upd := preserveUpdate
+       if upd == nil {
+               upd = NewAddSnapshotUpdate(snapshot)
+       }
+       b.updates = append(b.updates, upd)
        b.lastUpdatedMS = snapshot.TimestampMs
        b.lastSequenceNumber = &snapshot.SequenceNumber
        b.snapshotList = append(b.snapshotList, *snapshot)
@@ -979,7 +1006,7 @@ func (b *MetadataBuilder) SnapshotByID(id int64) 
(*Snapshot, error) {
                }
        }
 
-       return nil, fmt.Errorf("snapshot with id %d not found", id)
+       return nil, fmt.Errorf("%w: id %d", ErrSnapshotNotFound, id)
 }
 
 func (b *MetadataBuilder) NameMapping() iceberg.NameMapping {
diff --git a/table/metadata_builder_internal_test.go 
b/table/metadata_builder_internal_test.go
index 8754144d..eb8f57ef 100644
--- a/table/metadata_builder_internal_test.go
+++ b/table/metadata_builder_internal_test.go
@@ -445,7 +445,9 @@ func TestSetRef(t *testing.T) {
 
        require.NoError(t, builder.AddSnapshot(&snapshot))
        err := builder.SetSnapshotRef(MainBranch, 10, BranchRef, 
WithMinSnapshotsToKeep(10))
-       require.ErrorContains(t, err, "can't set snapshot ref main to unknown 
snapshot 10: snapshot with id 10 not found")
+       require.ErrorIs(t, err, ErrSnapshotNotFound,
+               "missing snapshot lookup must wrap ErrSnapshotNotFound so 
callers can detect via errors.Is")
+       require.ErrorContains(t, err, "can't set snapshot ref main to unknown 
snapshot 10")
        require.NoError(t, builder.SetSnapshotRef(MainBranch, 1, BranchRef, 
WithMinSnapshotsToKeep(10)))
        require.Len(t, builder.snapshotList, 1)
        snap, err := builder.SnapshotByID(1)
diff --git a/table/occ_scenario_test.go b/table/occ_scenario_test.go
new file mode 100644
index 00000000..64ad9d91
--- /dev/null
+++ b/table/occ_scenario_test.go
@@ -0,0 +1,257 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table_test
+
+// Scenario-level regression test for the OCC manifest-list rebuild fix.
+//
+// Ported from the integration test suite to serve as a fast, local regression
+// guard.  Uses real on-disk Parquet and Avro files (local FS temp dir) but
+// does not require Docker or a remote catalog.
+
+import (
+       "context"
+       "fmt"
+       "path/filepath"
+       "sync/atomic"
+       "testing"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/apache/iceberg-go/table"
+       "github.com/stretchr/testify/suite"
+)
+
+// ---------------------------------------------------------------------------
+// Mock catalog
+// ---------------------------------------------------------------------------
+
+// occScenarioCatalog simulates N CAS conflicts (HTTP 409) followed by a
+// successful commit.  LoadTable always returns the current metadata so that
+// the retry loop can rebase against it.
+type occScenarioCatalog struct {
+       current          table.Metadata
+       conflictsLeft    int
+       loadTableCalls   atomic.Int32
+       commitTableCalls atomic.Int32
+       location         string
+}
+
+func (c *occScenarioCatalog) LoadTable(_ context.Context, _ table.Identifier) 
(*table.Table, error) {
+       c.loadTableCalls.Add(1)
+
+       return table.New(
+               []string{"default", "occ_scenario_test"},
+               c.current,
+               c.location+"/metadata/current.metadata.json",
+               func(_ context.Context) (iceio.IO, error) { return 
iceio.LocalFS{}, nil },
+               c,
+       ), nil
+}
+
+func (c *occScenarioCatalog) CommitTable(
+       _ context.Context,
+       _ table.Identifier,
+       _ []table.Requirement,
+       updates []table.Update,
+) (table.Metadata, string, error) {
+       c.commitTableCalls.Add(1)
+
+       if c.conflictsLeft > 0 {
+               c.conflictsLeft--
+
+               return nil, "", fmt.Errorf("%w: simulated 409 conflict", 
table.ErrCommitFailed)
+       }
+
+       newMeta, err := table.UpdateTableMetadata(c.current, updates, "")
+       if err != nil {
+               return nil, "", err
+       }
+
+       c.current = newMeta
+
+       return newMeta, c.location + "/metadata/final.metadata.json", nil
+}
+
+// ---------------------------------------------------------------------------
+// Test suite
+// ---------------------------------------------------------------------------
+
+type OCCScenarioTestSuite struct {
+       suite.Suite
+       ctx      context.Context
+       location string
+}
+
+func TestOCCScenario(t *testing.T) {
+       suite.Run(t, new(OCCScenarioTestSuite))
+}
+
+func (s *OCCScenarioTestSuite) SetupSuite() {
+       s.ctx = context.Background()
+}
+
+func (s *OCCScenarioTestSuite) SetupTest() {
+       s.location = filepath.ToSlash(s.T().TempDir())
+}
+
+func (s *OCCScenarioTestSuite) makeTable(
+       conflicts int,
+       extraProps iceberg.Properties,
+) (*table.Table, *occScenarioCatalog) {
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64},
+               iceberg.NestedField{ID: 2, Name: "val", Type: 
iceberg.PrimitiveTypes.String},
+       )
+
+       props := iceberg.Properties{table.PropertyFormatVersion: "2"}
+       for k, v := range extraProps {
+               props[k] = v
+       }
+
+       meta, err := table.NewMetadata(schema, iceberg.UnpartitionedSpec,
+               table.UnsortedSortOrder, s.location, props)
+       s.Require().NoError(err)
+
+       cat := &occScenarioCatalog{
+               current:       meta,
+               conflictsLeft: conflicts,
+               location:      s.location,
+       }
+
+       tbl := table.New(
+               []string{"default", "occ_scenario_test"},
+               meta,
+               s.location+"/metadata/v1.metadata.json",
+               func(_ context.Context) (iceio.IO, error) { return 
iceio.LocalFS{}, nil },
+               cat,
+       )
+
+       return tbl, cat
+}
+
+func (s *OCCScenarioTestSuite) makeArrowTable() arrow.Table {
+       mem := memory.DefaultAllocator
+       sc := arrow.NewSchema([]arrow.Field{
+               {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: true},
+               {Name: "val", Type: arrow.BinaryTypes.String, Nullable: true},
+       }, nil)
+
+       tbl, err := array.TableFromJSON(mem, sc, []string{
+               `[{"id": 1, "val": "hello"}]`,
+       })
+       s.Require().NoError(err)
+
+       return tbl
+}
+
+// ---------------------------------------------------------------------------
+// Regression test
+// ---------------------------------------------------------------------------
+
+// TestManifestListInheritedAfterConflict is a regression test for the bug
+// where a retried snapshot reused the original manifest list (built against a
+// stale parent) instead of inheriting the manifests already committed by
+// concurrent writers.
+//
+// Scenario:
+//   - Writer B commits one row to an empty table (snapshot B, manifest B).
+//   - Writer A starts a transaction from the empty base (no knowledge of B).
+//   - Writer A's first commit attempt fails with ErrCommitFailed (409).
+//   - doCommit reloads and gets snapshot B as the fresh head.
+//   - The rebuildManifestList closure rewrites the manifest list to contain
+//     manifest A (Writer A's file) and manifest B (inherited).
+//   - Writer A's second attempt succeeds.
+//
+// Without the fix the final snapshot contains only manifest A and a scan
+// returns 1 row instead of 2.
+func (s *OCCScenarioTestSuite) TestManifestListInheritedAfterConflict() {
+       props := iceberg.Properties{
+               table.CommitMinRetryWaitMsKey:      "0",
+               table.CommitMaxRetryWaitMsKey:      "0",
+               table.CommitTotalRetryTimeoutMsKey: "60000",
+               table.CommitNumRetriesKey:          "2",
+       }
+
+       // Step 1: commit Writer B's row to an empty table.
+       // This writes real Parquet + manifest Avro files to 
s.location/metadata/.
+       emptyTbl, catB := s.makeTable(0, props)
+       rowB := s.makeArrowTable()
+       defer rowB.Release()
+
+       txB := emptyTbl.NewTransaction()
+       s.Require().NoError(txB.AppendTable(s.ctx, rowB, rowB.NumRows(), nil))
+
+       _, err := txB.Commit(s.ctx)
+       s.Require().NoError(err, "Writer B must commit successfully")
+
+       metaAfterB := catB.current // catalog state after B's commit (real 
files on disk)
+
+       // Step 2: Writer A's catalog starts with B's committed state.
+       // It returns ErrCommitFailed once (simulating B having committed just
+       // before A), then accepts the retry.
+       catA := &occScenarioCatalog{
+               current:       metaAfterB,
+               conflictsLeft: 1,
+               location:      s.location,
+       }
+
+       // Step 3: Writer A's table starts from the EMPTY base — it loaded the
+       // table before B committed.
+       writerATable := table.New(
+               emptyTbl.Identifier(),
+               emptyTbl.Metadata(), // empty: no knowledge of B's snapshot yet
+               s.location+"/metadata/v1.metadata.json",
+               func(_ context.Context) (iceio.IO, error) { return 
iceio.LocalFS{}, nil },
+               catA,
+       )
+
+       rowA := s.makeArrowTable()
+       defer rowA.Release()
+
+       txA := writerATable.NewTransaction()
+       s.Require().NoError(txA.AppendTable(s.ctx, rowA, rowA.NumRows(), nil))
+
+       _, err = txA.Commit(s.ctx)
+       s.Require().NoError(err, "Writer A must succeed after one conflict 
retry")
+
+       s.Equal(int32(2), catA.commitTableCalls.Load(),
+               "expected 2 commit attempts: 1 conflict + 1 success")
+
+       // Step 4: the final snapshot must reference BOTH manifests.
+       // manifest A (Writer A's data file) + manifest B (inherited from B's 
snapshot).
+       // Without the fix, only manifest A is present and the count is 1.
+       finalSnap := catA.current.CurrentSnapshot()
+       s.Require().NotNil(finalSnap, "committed table must have a current 
snapshot")
+
+       fio := iceio.LocalFS{}
+       manifests, err := finalSnap.Manifests(fio)
+       s.Require().NoError(err, "must be able to read manifest list from disk")
+       s.Require().Len(manifests, 2,
+               "expected 2 manifests (Writer A + Writer B); got %d — "+
+                       "manifest list not inherited on retry", len(manifests))
+
+       // Each manifest must have exactly 1 data file (added or existing).
+       for i, mf := range manifests {
+               count := mf.AddedDataFiles() + mf.ExistingDataFiles()
+               s.EqualValues(1, count,
+                       "manifest[%d] should have 1 data file, got %d", i, 
count)
+       }
+}
diff --git a/table/rebuild_manifest_test.go b/table/rebuild_manifest_test.go
new file mode 100644
index 00000000..ea742b84
--- /dev/null
+++ b/table/rebuild_manifest_test.go
@@ -0,0 +1,515 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "context"
+       "errors"
+       "testing"
+
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+// newMetadataWithLastSeqNum builds a v2 Metadata whose last-sequence-number
+// equals lastSeqNum by grafting a synthetic snapshot at that sequence number.
+// Used by tests that need to control freshMeta.LastSequenceNumber().
+func newMetadataWithLastSeqNum(t *testing.T, lastSeqNum int64) Metadata {
+       t.Helper()
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+       )
+       meta, err := NewMetadata(schema, iceberg.UnpartitionedSpec, 
UnsortedSortOrder, "file:///tmp/seqtest",
+               iceberg.Properties{PropertyFormatVersion: "2"})
+       require.NoError(t, err)
+       builder, err := MetadataBuilderFromBase(meta, "")
+       require.NoError(t, err)
+       snap := Snapshot{
+               SnapshotID:     1000,
+               SequenceNumber: lastSeqNum,
+               TimestampMs:    meta.LastUpdatedMillis() + 1,
+               Summary:        &Summary{Operation: OpAppend},
+       }
+       require.NoError(t, builder.AddSnapshot(&snap))
+       require.NoError(t, builder.SetSnapshotRef(MainBranch, 1000, BranchRef))
+       out, err := builder.Build()
+       require.NoError(t, err)
+
+       return out
+}
+
+// newV3MetadataWithNextRowID builds a v3 Metadata whose NextRowID() returns
+// nextRowID by adding a synthetic snapshot that consumes that many rows.
+// Used by tests that need to control freshMeta.NextRowID() for v3 row lineage.
+func newV3MetadataWithNextRowID(t *testing.T, nextRowID int64) Metadata {
+       t.Helper()
+       spec := iceberg.NewPartitionSpec()
+       txn, _ := createTestTransactionWithMemIO(t, spec)
+       txn.meta.formatVersion = 3
+
+       firstRowID := int64(0)
+       addedRows := nextRowID
+       snap := Snapshot{
+               SnapshotID:     1000,
+               SequenceNumber: 1,
+               TimestampMs:    txn.meta.base.LastUpdatedMillis() + 1,
+               Summary:        &Summary{Operation: OpAppend},
+               FirstRowID:     &firstRowID,
+               AddedRows:      &addedRows,
+       }
+       require.NoError(t, txn.meta.AddSnapshot(&snap))
+       require.NoError(t, txn.meta.SetSnapshotRef(MainBranch, 1000, BranchRef))
+
+       meta, err := txn.meta.Build()
+       require.NoError(t, err)
+       require.Equal(t, nextRowID, meta.NextRowID())
+
+       return meta
+}
+
+// ptr returns a pointer to v, used in test helper expressions.
+// NOTE: ptr is also declared in pos_delete_partitioned_fanout_writer_test.go;
+// both files share the same package so only one declaration is needed.
+// This comment documents the shared usage — do not re-add a declaration here.
+
+// rebuildUpdate constructs an addSnapshotUpdate whose rebuildManifestList
+// closure simply records the freshParent it received and returns a new
+// snapshot whose ManifestList is the given newManifestList value.
+func rebuildUpdate(snap *Snapshot, newManifestList string, gotParent 
**Snapshot) *addSnapshotUpdate {
+       return &addSnapshotUpdate{
+               baseUpdate: baseUpdate{ActionName: UpdateAddSnapshot},
+               Snapshot:   snap,
+               rebuildManifestList: func(_ context.Context, _ Metadata, 
freshParent *Snapshot, _ iceio.WriteFileIO, _ int) (*Snapshot, error) {
+                       *gotParent = freshParent
+                       rebuilt := *snap
+                       rebuilt.ManifestList = newManifestList
+
+                       return &rebuilt, nil
+               },
+       }
+}
+
+// TestRebuildSnapshotUpdates_CallsClosureWithFreshParent verifies that
+// rebuildSnapshotUpdates invokes the rebuild closure and passes the fresh
+// branch head as freshParent.
+func TestRebuildSnapshotUpdates_CallsClosureWithFreshParent(t *testing.T) {
+       const oldManifest = "s3://bucket/old-manifest-list.avro"
+       const newManifest = "s3://bucket/new-manifest-list.avro"
+
+       // Build a fresh metadata that has a different snapshot than the one
+       // embedded in the update, so the parent-hasn't-changed guard is
+       // bypassed and the closure must be called.
+       freshHead := int64(42)
+       freshMeta := newConflictTestMetadata(t, &freshHead)
+
+       parentID := int64(7) // original snap's parent — does NOT match 
freshHead (42)
+       snap := &Snapshot{
+               SnapshotID:       99,
+               ParentSnapshotID: &parentID,
+               ManifestList:     oldManifest,
+               Summary:          &Summary{Operation: OpAppend},
+       }
+
+       var receivedParent *Snapshot
+       upd := rebuildUpdate(snap, newManifest, &receivedParent)
+
+       rebuilt, orphaned, err := rebuildSnapshotUpdates(
+               t.Context(),
+               []Update{upd},
+               freshMeta,
+               MainBranch,
+               iceio.LocalFS{},
+               1,
+       )
+       require.NoError(t, err)
+       require.Len(t, rebuilt, 1)
+       require.Len(t, orphaned, 1, "old manifest list must be recorded as 
orphaned")
+
+       // The closure should have been called with the branch's current head.
+       require.NotNil(t, receivedParent)
+       assert.Equal(t, freshHead, receivedParent.SnapshotID)
+
+       // The rebuilt update must carry the new manifest list.
+       addUpd, ok := rebuilt[0].(*addSnapshotUpdate)
+       require.True(t, ok)
+       assert.Equal(t, newManifest, addUpd.Snapshot.ManifestList)
+
+       // The superseded manifest list becomes an orphan.
+       assert.Equal(t, oldManifest, orphaned[0])
+}
+
+// TestRebuildSnapshotUpdates_SkipsWhenParentUnchanged verifies that
+// rebuildSnapshotUpdates skips the rebuild when the update's snapshot
+// already has the fresh branch head as its parent (no-op retry).
+func TestRebuildSnapshotUpdates_SkipsWhenParentUnchanged(t *testing.T) {
+       const manifest = "s3://bucket/manifest-list.avro"
+
+       freshHead := int64(42)
+       freshMeta := newConflictTestMetadata(t, &freshHead)
+
+       // Parent already equals the fresh head — rebuild must be skipped.
+       snap := &Snapshot{
+               SnapshotID:       99,
+               ParentSnapshotID: &freshHead, // same as fresh head
+               ManifestList:     manifest,
+               Summary:          &Summary{Operation: OpAppend},
+       }
+
+       called := false
+       upd := &addSnapshotUpdate{
+               baseUpdate: baseUpdate{ActionName: UpdateAddSnapshot},
+               Snapshot:   snap,
+               rebuildManifestList: func(_ context.Context, _ Metadata, _ 
*Snapshot, _ iceio.WriteFileIO, _ int) (*Snapshot, error) {
+                       called = true
+
+                       return snap, nil
+               },
+       }
+
+       rebuilt, orphaned, err := rebuildSnapshotUpdates(
+               t.Context(),
+               []Update{upd},
+               freshMeta,
+               MainBranch,
+               iceio.LocalFS{},
+               1,
+       )
+       require.NoError(t, err)
+       assert.False(t, called, "rebuild closure must not be called when parent 
is already up-to-date")
+       assert.Empty(t, orphaned, "no orphans when rebuild is skipped")
+       assert.Same(t, upd, rebuilt[0].(*addSnapshotUpdate), "original update 
must pass through unchanged")
+}
+
+// TestRebuildSnapshotUpdates_PassesThroughNonRebuildUpdates verifies that
+// updates without a rebuildManifestList closure are returned unmodified.
+func TestRebuildSnapshotUpdates_PassesThroughNonRebuildUpdates(t *testing.T) {
+       plainUpd := NewAddSnapshotUpdate(&Snapshot{
+               SnapshotID:   1,
+               ManifestList: "s3://bucket/no-rebuild.avro",
+               Summary:      &Summary{Operation: OpAppend},
+       })
+
+       // freshMeta may be nil — the plain update must not be touched.
+       rebuilt, orphaned, err := rebuildSnapshotUpdates(
+               t.Context(),
+               []Update{plainUpd},
+               nil,
+               MainBranch,
+               iceio.LocalFS{},
+               0,
+       )
+       require.NoError(t, err)
+       assert.Empty(t, orphaned)
+       assert.Same(t, plainUpd, rebuilt[0].(*addSnapshotUpdate))
+}
+
+// TestRebuildSnapshotUpdates_PropagatesClosureError verifies that an error
+// returned by the rebuild closure surfaces as the function's return error.
+func TestRebuildSnapshotUpdates_PropagatesClosureError(t *testing.T) {
+       freshHead := int64(5)
+       freshMeta := newConflictTestMetadata(t, &freshHead)
+
+       parent := int64(1)
+       snap := &Snapshot{
+               SnapshotID:       10,
+               ParentSnapshotID: &parent,
+               ManifestList:     "s3://bucket/old.avro",
+               Summary:          &Summary{Operation: OpAppend},
+       }
+       wantErr := errors.New("simulated S3 write failure")
+       upd := &addSnapshotUpdate{
+               baseUpdate: baseUpdate{ActionName: UpdateAddSnapshot},
+               Snapshot:   snap,
+               rebuildManifestList: func(_ context.Context, _ Metadata, _ 
*Snapshot, _ iceio.WriteFileIO, _ int) (*Snapshot, error) {
+                       return nil, wantErr
+               },
+       }
+
+       _, _, err := rebuildSnapshotUpdates(t.Context(), []Update{upd}, 
freshMeta, MainBranch, iceio.LocalFS{}, 1)
+       assert.ErrorIs(t, err, wantErr)
+}
+
+// ---------------------------------------------------------------------------
+// Fix 3 — newSeq derived from freshMeta.LastSequenceNumber()
+// ---------------------------------------------------------------------------
+
+// TestRebuildFn_SeqNumDerivedFromFreshMeta verifies that the rebuilt 
snapshot's
+// SequenceNumber equals freshMeta.LastSequenceNumber()+1, NOT
+// freshParent.SequenceNumber+1. A concurrent writer on a different branch can
+// advance the table-wide last-sequence-number without advancing this branch's
+// parent, so using freshParent.SequenceNumber+1 would violate the spec 
invariant.
+func TestRebuildFn_SeqNumDerivedFromFreshMeta(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       txn, wfs := createTestTransactionWithMemIO(t, spec)
+
+       sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
+       sp.appendDataFile(newTestDataFile(t, spec, 
"mem://default/table-location/data/f.parquet", nil))
+
+       updates, _, err := sp.commit(context.Background())
+       require.NoError(t, err)
+       addSnap := updates[0].(*addSnapshotUpdate)
+       require.NotNil(t, addSnap.rebuildManifestList, "rebuildManifestList 
closure must be set")
+
+       // Simulate a concurrent writer on another branch that bumped the global
+       // last-sequence-number to 99 without advancing this branch's parent.
+       // old code: newSeq = capturedSnapshot.SequenceNumber (stale, ≤ 99 — 
spec violation)
+       // new code: newSeq = freshMeta.LastSequenceNumber() + 1 = 100
+       freshMeta := newMetadataWithLastSeqNum(t, 99)
+       require.Equal(t, int64(99), freshMeta.LastSequenceNumber())
+
+       rebuilt, err := addSnap.rebuildManifestList(context.Background(), 
freshMeta, nil, wfs, 1)
+       require.NoError(t, err)
+       require.Equal(t, int64(100), rebuilt.SequenceNumber,
+               "SequenceNumber must be freshMeta.LastSequenceNumber()+1; using 
freshParent.SequenceNumber+1 violates the spec when another branch advanced the 
global counter")
+}
+
+// TestRebuildFn_SeqNumV1TableIsZero verifies that v1 tables keep 
SequenceNumber == 0
+// regardless of what freshMeta reports (v1 does not use sequence numbers).
+func TestRebuildFn_SeqNumV1TableIsZero(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       txn, wfs := createTestTransactionWithMemIO(t, spec)
+       txn.meta.formatVersion = 1 // override to v1
+
+       sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
+       sp.appendDataFile(newTestDataFile(t, spec, 
"mem://default/table-location/data/f.parquet", nil))
+
+       updates, _, err := sp.commit(context.Background())
+       require.NoError(t, err)
+       addSnap := updates[0].(*addSnapshotUpdate)
+       require.NotNil(t, addSnap.rebuildManifestList)
+
+       freshMeta := newMetadataWithLastSeqNum(t, 99)
+       rebuilt, err := addSnap.rebuildManifestList(context.Background(), 
freshMeta, nil, wfs, 1)
+       require.NoError(t, err)
+       require.Equal(t, int64(0), rebuilt.SequenceNumber, "v1 tables must 
always have SequenceNumber == 0")
+}
+
+// ---------------------------------------------------------------------------
+// Fix 1 — firstRowID derived from freshMeta.NextRowID()
+// ---------------------------------------------------------------------------
+
+// TestRebuildFn_V3FirstRowIDDerivedFromFreshMeta verifies that on a v3 table
+// the rebuilt snapshot's FirstRowID equals freshMeta.NextRowID() rather than
+// the hardcoded 0. If two writers race and the peer commits first, the
+// catalog's nextRowID has already advanced; using 0 would produce a
+// first-row-id that disagrees with the catalog's view and fails row-lineage
+// validation.
+func TestRebuildFn_V3FirstRowIDDerivedFromFreshMeta(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       txn, wfs := createTestTransactionWithMemIO(t, spec)
+       txn.meta.formatVersion = 3
+
+       sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
+       sp.appendDataFile(newTestDataFile(t, spec, 
"mem://default/table-location/data/f.parquet", nil))
+
+       updates, _, err := sp.commit(context.Background())
+       require.NoError(t, err)
+       addSnap := updates[0].(*addSnapshotUpdate)
+       require.NotNil(t, addSnap.rebuildManifestList)
+
+       // Asymmetry probe (R1): mutate the captured snapshot's AddedRows to a
+       // sentinel that cannot match a correct recomputation. If a future
+       // regression carries this value across instead of recomputing it from
+       // writer.NextRowID()-firstRowID, the assertions below will fail.
+       require.NotNil(t, addSnap.Snapshot.AddedRows, "captured snapshot must 
have AddedRows on v3")
+       capturedAddedRows := *addSnap.Snapshot.AddedRows
+       sentinel := int64(999_999)
+       *addSnap.Snapshot.AddedRows = sentinel
+
+       // Simulate a concurrent writer that added 50 rows, advancing nextRowID 
to 50.
+       // old code: firstRowID = 0 (hardcoded)
+       // new code: firstRowID = freshMeta.NextRowID() = 50
+       freshMeta := newV3MetadataWithNextRowID(t, 50)
+       require.Equal(t, int64(50), freshMeta.NextRowID(), 
"freshMeta.NextRowID() must be 50")
+
+       rebuilt, err := addSnap.rebuildManifestList(context.Background(), 
freshMeta, nil, wfs, 1)
+       require.NoError(t, err)
+       require.NotNil(t, rebuilt.FirstRowID, "v3 rebuilt snapshot must have 
FirstRowID")
+       require.Equal(t, int64(50), *rebuilt.FirstRowID,
+               "FirstRowID must equal freshMeta.NextRowID(); hardcoded 0 would 
conflict with catalog's advanced nextRowID")
+
+       // AddedRows must be RECOMPUTED from writer.NextRowID()-firstRowID, not
+       // carried over from the captured snapshot. The producer wrote a single
+       // 1-row file, so the correct recomputed value is 1 — which is also what
+       // capturedAddedRows happened to be before mutation. The sentinel makes
+       // the difference visible: if the rebuild path reused the captured slot,
+       // rebuilt.AddedRows would equal sentinel instead.
+       require.NotNil(t, rebuilt.AddedRows, "v3 rebuilt snapshot must have 
AddedRows")
+       require.NotEqual(t, sentinel, *rebuilt.AddedRows,
+               "AddedRows must be recomputed; carrying the captured value 
(mutated to sentinel) would be a regression")
+       require.Equal(t, int64(1), *rebuilt.AddedRows,
+               "AddedRows must equal writer.NextRowID()-firstRowID (this 
producer's 1 data row)")
+
+       // Restore so any later inspection of capturedAddedRows is not 
misleading.
+       *addSnap.Snapshot.AddedRows = capturedAddedRows
+}
+
+// TestRebuildFn_V3FirstRowIDZeroWhenNilNextRowID verifies that on a v3 table
+// whose freshMeta has NextRowID()==0 (brand-new table — no rows yet) the
+// rebuilt snapshot's FirstRowID is 0 and AddedRows reflects only the rows
+// added by this producer. The Metadata API returns int64 (not *int64), so
+// "nil NextRowID" is encoded as the zero value.
+func TestRebuildFn_V3FirstRowIDZeroWhenNilNextRowID(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       txn, wfs := createTestTransactionWithMemIO(t, spec)
+       txn.meta.formatVersion = 3
+
+       sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
+       sp.appendDataFile(newTestDataFile(t, spec, 
"mem://default/table-location/data/f.parquet", nil))
+
+       updates, _, err := sp.commit(context.Background())
+       require.NoError(t, err)
+       addSnap := updates[0].(*addSnapshotUpdate)
+       require.NotNil(t, addSnap.rebuildManifestList)
+
+       // Asymmetry probe (R1): mutate the captured snapshot's AddedRows to a
+       // sentinel that cannot match a correct recomputation.
+       require.NotNil(t, addSnap.Snapshot.AddedRows, "captured snapshot must 
have AddedRows on v3")
+       sentinel := int64(888_888)
+       *addSnap.Snapshot.AddedRows = sentinel
+
+       // freshMeta with NextRowID()==0 (the "nil" equivalent for the int64 
API).
+       freshMeta := newV3MetadataWithNextRowID(t, 0)
+       require.Equal(t, int64(0), freshMeta.NextRowID())
+
+       rebuilt, err := addSnap.rebuildManifestList(context.Background(), 
freshMeta, nil, wfs, 1)
+       require.NoError(t, err, "rebuild must not panic when 
freshMeta.NextRowID() is 0")
+       require.NotNil(t, rebuilt.FirstRowID)
+       require.Equal(t, int64(0), *rebuilt.FirstRowID,
+               "FirstRowID must be 0 when freshMeta.NextRowID() is 0 
(brand-new table)")
+       require.NotNil(t, rebuilt.AddedRows)
+       require.NotEqual(t, sentinel, *rebuilt.AddedRows,
+               "AddedRows must be recomputed; carrying the captured (sentinel) 
value would be a regression")
+       require.Equal(t, int64(1), *rebuilt.AddedRows,
+               "AddedRows must reflect only this producer's contribution (1 
row)")
+}
+
+// ---------------------------------------------------------------------------
+// Fix 2 — snapshot summary recomputed against freshParent
+// ---------------------------------------------------------------------------
+
+// TestRebuildFn_SummaryRebasedAgainstFreshParent verifies that the rebuilt
+// snapshot's summary totals are computed against the fresh parent's summary,
+// not the stale totals captured at attempt 0. If a concurrent writer added
+// files between attempt 0 and the retry, publishing the stale totals would
+// regress every consumer that reads the summary.
+func TestRebuildFn_SummaryRebasedAgainstFreshParent(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       txn, wfs := createTestTransactionWithMemIO(t, spec)
+
+       sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
+       sp.appendDataFile(newTestDataFile(t, spec, 
"mem://default/table-location/data/f.parquet", nil))
+
+       updates, _, err := sp.commit(context.Background())
+       require.NoError(t, err)
+       addSnap := updates[0].(*addSnapshotUpdate)
+       require.NotNil(t, addSnap.rebuildManifestList)
+
+       // Construct a freshParent with a known summary. The concurrent writer
+       // added 5 data files (total-data-files = 5) before our retry.
+       // Our producer added 1 file (added-data-files = 1 in 
capturedSnapshot.Summary).
+       // Expected rebuilt total-data-files = 5 + 1 = 6.
+       freshParentSummary := iceberg.Properties{
+               "total-data-files":       "5",
+               "total-records":          "500",
+               "total-files-size":       "5000",
+               "total-delete-files":     "0",
+               "total-position-deletes": "0",
+               "total-equality-deletes": "0",
+       }
+
+       // Write an empty manifest list for the freshParent so Manifests() can 
open it.
+       parentManifestListPath := 
"mem://default/table-location/metadata/fresh-parent-snap.avro"
+       out, createErr := wfs.Create(parentManifestListPath)
+       require.NoError(t, createErr)
+       writeErr := iceberg.WriteManifestList(2, out, 77, nil, ptr(int64(0)), 
0, nil)
+       require.NoError(t, writeErr)
+       require.NoError(t, out.Close())
+
+       freshParentID := int64(77)
+       freshParent := &Snapshot{
+               SnapshotID: freshParentID,
+               // Set a SequenceNumber that DIFFERS from 
freshMeta.LastSequenceNumber()
+               // so the assertion below can distinguish "newSeq derived from
+               // freshMeta.LastSequenceNumber()" (correct) from "newSeq 
derived from
+               // freshParent.SequenceNumber" (regression). Using 
freshParentSeq = 99
+               // versus freshMeta.LastSequenceNumber() = 1 makes the two paths
+               // disagree by 98.
+               SequenceNumber: 99,
+               ManifestList:   parentManifestListPath,
+               Summary:        &Summary{Operation: OpAppend, Properties: 
freshParentSummary},
+       }
+       freshMeta := newMetadataWithLastSeqNum(t, 1)
+       require.NotEqual(t, freshParent.SequenceNumber, 
freshMeta.LastSequenceNumber(),
+               "test fixture must keep these distinct so the seq-num source is 
observable")
+
+       rebuilt, err := addSnap.rebuildManifestList(context.Background(), 
freshMeta, freshParent, wfs, 1)
+       require.NoError(t, err)
+       require.NotNil(t, rebuilt.Summary)
+
+       // R2: pin the seq-num source. Must equal 
freshMeta.LastSequenceNumber()+1
+       // (=2) and must NOT equal freshParent.SequenceNumber+1 (=100). A 
regression
+       // that derives newSeq from freshParent would fail this assertion.
+       require.Equal(t, freshMeta.LastSequenceNumber()+1, 
rebuilt.SequenceNumber,
+               "newSeq must derive from freshMeta.LastSequenceNumber(), not 
freshParent.SequenceNumber")
+
+       // All three totals named in the reviewer comment must be rebased 
against
+       // freshParent: total-data-files, total-records, total-files-size.
+       // newTestDataFile contributes record count 1 and file size 1 (both 
fields
+       // of the data file are set to 1 by newTestDataFileWithCount default).
+       gotDataFiles := rebuilt.Summary.Properties.GetInt("total-data-files", 
-1)
+       require.Equal(t, 6, gotDataFiles,
+               "total-data-files must be freshParent total (5) + this 
producer's added count (1); stale attempt-0 total would be wrong")
+       gotRecords := rebuilt.Summary.Properties.GetInt("total-records", -1)
+       require.Equal(t, 501, gotRecords,
+               "total-records must be freshParent total (500) + this 
producer's added records (1)")
+       gotFileSize := rebuilt.Summary.Properties.GetInt("total-files-size", -1)
+       require.Equal(t, 5001, gotFileSize,
+               "total-files-size must be freshParent total (5000) + this 
producer's added size (1)")
+}
+
+// TestRebuildFn_SummaryFreshParentNilKeepsCapturedSummary verifies that when
+// freshParent is nil (first snapshot on a brand-new table), the rebuild keeps
+// capturedSnapshot.Summary as-is. There is no prior parent to rebase against,
+// so the attempt-0 summary is the correct base.
+func TestRebuildFn_SummaryFreshParentNilKeepsCapturedSummary(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       txn, wfs := createTestTransactionWithMemIO(t, spec)
+
+       sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
+       sp.appendDataFile(newTestDataFile(t, spec, 
"mem://default/table-location/data/f.parquet", nil))
+
+       updates, _, err := sp.commit(context.Background())
+       require.NoError(t, err)
+       addSnap := updates[0].(*addSnapshotUpdate)
+       require.NotNil(t, addSnap.rebuildManifestList)
+
+       capturedSummary := addSnap.Snapshot.Summary
+       require.NotNil(t, capturedSummary)
+
+       freshMeta := newMetadataWithLastSeqNum(t, 1)
+       rebuilt, err := addSnap.rebuildManifestList(context.Background(), 
freshMeta, nil, wfs, 1)
+       require.NoError(t, err)
+       require.NotNil(t, rebuilt.Summary)
+       require.Equal(t, capturedSummary.Operation, rebuilt.Summary.Operation,
+               "with freshParent=nil, rebuilt.Summary must equal 
capturedSnapshot.Summary")
+       require.Equal(t, capturedSummary.Properties, rebuilt.Summary.Properties,
+               "with freshParent=nil, rebuilt.Summary properties must be 
unchanged")
+}
diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go
index 9d0f052b..4e98a822 100644
--- a/table/snapshot_producers.go
+++ b/table/snapshot_producers.go
@@ -826,12 +826,58 @@ func (sp *snapshotProducer) summary(props 
iceberg.Properties) (Summary, error) {
        }, previousSummary)
 }
 
+// computeOwnManifests returns the subset of allManifests that were written
+// by this producer (i.e. not inherited from the parent snapshot). These are
+// preserved across OCC retry attempts when the manifest list is rebuilt
+// against a fresh parent.
+func (sp *snapshotProducer) computeOwnManifests(allManifests 
[]iceberg.ManifestFile) ([]iceberg.ManifestFile, error) {
+       if sp.parentSnapshotID <= 0 {
+               // No parent means all manifests are new — nothing to exclude.
+               return allManifests, nil
+       }
+
+       parent, err := sp.txn.meta.SnapshotByID(sp.parentSnapshotID)
+       if err != nil {
+               return nil, fmt.Errorf("computeOwnManifests: lookup parent 
snapshot %d: %w", sp.parentSnapshotID, err)
+       }
+       if parent == nil {
+               return nil, fmt.Errorf("%w: computeOwnManifests parent id %d", 
ErrSnapshotNotFound, sp.parentSnapshotID)
+       }
+
+       parentManifests, err := parent.Manifests(sp.io)
+       if err != nil {
+               return nil, fmt.Errorf("computeOwnManifests: read parent 
manifests: %w", err)
+       }
+
+       inherited := make(map[string]bool, len(parentManifests))
+       for _, m := range parentManifests {
+               inherited[m.FilePath()] = true
+       }
+
+       own := make([]iceberg.ManifestFile, 0, len(allManifests))
+       for _, m := range allManifests {
+               if !inherited[m.FilePath()] {
+                       own = append(own, m)
+               }
+       }
+
+       return own, nil
+}
+
 func (sp *snapshotProducer) commit(ctx context.Context) (_ []Update, _ 
[]Requirement, err error) {
        newManifests, err := sp.manifests(ctx)
        if err != nil {
                return nil, nil, err
        }
 
+       // Separate "own" manifests (those written by this producer) from
+       // manifests inherited from the stale parent. The own manifests are
+       // preserved when the manifest list is rebuilt during OCC retries.
+       ownManifests, err := sp.computeOwnManifests(newManifests)
+       if err != nil {
+               return nil, nil, err
+       }
+
        nextSequence := sp.txn.meta.nextSequenceNumber()
        summary, err := sp.summary(sp.snapshotProps)
        if err != nil {
@@ -914,9 +960,128 @@ func (sp *snapshotProducer) commit(ctx context.Context) 
(_ []Update, _ []Require
                })
        }
 
+       // Build the manifest-list rebuild closure. It is called by doCommit
+       // on each OCC retry to regenerate the manifest list so it correctly
+       // inherits all data files committed by concurrent writers since the
+       // original snapshot was built.
+       formatVersion := sp.txn.meta.formatVersion
+       snapshotID := sp.snapshotID
+       commitUUID := sp.commitUuid
+       capturedSnapshot := snapshot // copy the value so the closure is 
self-contained
+       processManifestsFn := func(m []iceberg.ManifestFile) 
([]iceberg.ManifestFile, error) {
+               return sp.processManifests(m)
+       }
+
+       rebuildFn := func(_ context.Context, freshMeta Metadata, freshParent 
*Snapshot, fio iceio.WriteFileIO, attempt int) (_ *Snapshot, retErr error) {
+               // Load inherited manifests from the fresh parent.
+               var inherited []iceberg.ManifestFile
+               if freshParent != nil {
+                       inherited, retErr = freshParent.Manifests(fio)
+                       if retErr != nil {
+                               return nil, fmt.Errorf("rebuild manifest list: 
load parent manifests: %w", retErr)
+                       }
+               }
+
+               // Combine own manifests with inherited ones, applying any
+               // producer-specific processing (no-op for fast/merge-append).
+               combined, procErr := 
processManifestsFn(slices.Concat(ownManifests, inherited))
+               if procErr != nil {
+                       return nil, fmt.Errorf("rebuild manifest list: process 
manifests: %w", procErr)
+               }
+
+               // Derive the sequence number from the fresh table-wide 
last-sequence-number.
+               // Using freshParent.SequenceNumber + 1 would violate the spec 
when a
+               // concurrent writer on a different branch bumps 
last-sequence-number
+               // without advancing this branch's parent — 
MetadataBuilder.AddSnapshot
+               // rejects SequenceNumber <= lastSequenceNumber.
+               var newSeq int64
+               if formatVersion >= 2 {
+                       newSeq = freshMeta.LastSequenceNumber() + 1
+               }
+
+               // Write the rebuilt manifest list to a path unique to this 
retry
+               // attempt. Each retry uses a different attempt counter in the 
filename
+               // (snap-{id}-{attempt}-{uuid}.avro) so that S3 
conditional-write
+               // semantics (if-none-match) do not reject the overwrite. 
Orphaned files
+               // from superseded retry attempts are removed by doCommit after 
the
+               // commit succeeds.
+               fname := newManifestListFileName(snapshotID, attempt, 
commitUUID)
+               manifestListPath := locProvider.NewMetadataLocation(fname)
+
+               out, createErr := fio.Create(manifestListPath)
+               if createErr != nil {
+                       return nil, fmt.Errorf("rebuild manifest list: create 
file: %w", createErr)
+               }
+               defer internal.CheckedClose(out, &retErr)
+
+               var parentID *int64
+               if freshParent != nil {
+                       id := freshParent.SnapshotID
+                       parentID = &id
+               }
+
+               firstRowID := int64(0)
+               var addedRows int64
+               if formatVersion == 3 {
+                       // Derive firstRowID from the fresh metadata so the 
manifest-list
+                       // first-row-id field is consistent with the catalog's 
nextRowID
+                       // after concurrent writers have advanced it since 
attempt 0.
+                       firstRowID = freshMeta.NextRowID()
+                       writer, wrErr := iceberg.NewManifestListWriterV3(out, 
snapshotID, newSeq, firstRowID, parentID)
+                       if wrErr != nil {
+                               return nil, fmt.Errorf("rebuild manifest list: 
create v3 writer: %w", wrErr)
+                       }
+                       defer internal.CheckedClose(writer, &retErr)
+                       if addErr := writer.AddManifests(combined); addErr != 
nil {
+                               return nil, fmt.Errorf("rebuild manifest list: 
add manifests: %w", addErr)
+                       }
+                       if writer.NextRowID() != nil {
+                               addedRows = *writer.NextRowID() - firstRowID
+                       }
+               } else {
+                       if wErr := iceberg.WriteManifestList(formatVersion, 
out, snapshotID, parentID, &newSeq, firstRowID, combined); wErr != nil {
+                               return nil, fmt.Errorf("rebuild manifest list: 
write: %w", wErr)
+                       }
+               }
+
+               rebuilt := capturedSnapshot
+               rebuilt.ManifestList = manifestListPath
+               rebuilt.ParentSnapshotID = parentID
+               rebuilt.SequenceNumber = newSeq
+               if formatVersion == 3 {
+                       rebuilt.FirstRowID = &firstRowID
+                       rebuilt.AddedRows = &addedRows
+               }
+
+               // Recompute snapshot summary against the fresh parent so that 
totals
+               // (total-records, total-data-files, total-files-size) are not 
regressed
+               // to the stale values captured at attempt 0. The per-operation 
delta
+               // (added-data-files, added-records, etc.) is preserved in
+               // capturedSnapshot.Summary and is replayed on top of the fresh 
base.
+               if freshParent != nil && freshParent.Summary != nil && 
capturedSnapshot.Summary != nil {
+                       deltaSummary := Summary{
+                               Operation:  capturedSnapshot.Summary.Operation,
+                               Properties: 
maps.Clone(capturedSnapshot.Summary.Properties),
+                       }
+                       if s, sumErr := updateSnapshotSummaries(deltaSummary, 
freshParent.Summary.Properties); sumErr == nil {
+                               rebuilt.Summary = &s
+                       }
+               }
+
+               return &rebuilt, nil
+       }
+
+       addSnap := NewAddSnapshotUpdate(&snapshot)
+       addSnap.ownManifests = ownManifests
+       addSnap.rebuildManifestList = rebuildFn
+
        return []Update{
-                       NewAddSnapshotUpdate(&snapshot),
-                       NewSetSnapshotRefUpdate(branch, sp.snapshotID, 
BranchRef, -1, -1, -1),
+                       addSnap,
+                       // Use 0 (not -1) for the optional fields so they are 
omitted by
+                       // `omitempty` in JSON marshalling. -1 is a sentinel 
meaning
+                       // "no limit" internally, but strict catalogs such as 
AWS S3 Tables
+                       // reject a payload that explicitly contains negative 
values.
+                       NewSetSnapshotRefUpdate(branch, sp.snapshotID, 
BranchRef, 0, 0, 0),
                }, []Requirement{
                        AssertRefSnapshotID(branch, 
sp.txn.meta.currentSnapshotID),
                }, nil
diff --git a/table/snapshot_producers_test.go b/table/snapshot_producers_test.go
index 03087f70..510efe96 100644
--- a/table/snapshot_producers_test.go
+++ b/table/snapshot_producers_test.go
@@ -40,6 +40,14 @@ type limitedWriteCloser struct {
        limit   int
        written int
        err     error
+
+       // parent and path, when both set, cause Close() to persist the
+       // successfully-written payload back into parent.files[path]. This makes
+       // the orphan/cleanup tests exercise the real write→delete cycle (R3)
+       // instead of pre-populating wfs.files with a placeholder.
+       parent *memIO
+       path   string
+       buf    bytes.Buffer
 }
 
 func (w *limitedWriteCloser) Write(p []byte) (int, error) {
@@ -47,11 +55,21 @@ func (w *limitedWriteCloser) Write(p []byte) (int, error) {
                return 0, w.err
        }
        w.written += len(p)
+       if w.parent != nil {
+               w.buf.Write(p)
+       }
 
        return len(p), nil
 }
 
 func (w *limitedWriteCloser) Close() error {
+       if w.parent == nil || w.path == "" {
+               return nil
+       }
+       w.parent.mu.Lock()
+       defer w.parent.mu.Unlock()
+       w.parent.files[w.path] = append([]byte(nil), w.buf.Bytes()...)
+
        return nil
 }
 
@@ -62,6 +80,7 @@ func (w *limitedWriteCloser) ReadFrom(r io.Reader) (int64, 
error) {
 type memIO struct {
        limit int
        err   error
+       mu    sync.Mutex
        files map[string][]byte
 }
 
@@ -74,7 +93,9 @@ func newMemIO(limit int, err error) *memIO {
 }
 
 func (m *memIO) Open(name string) (iceio.File, error) {
+       m.mu.Lock()
        data, ok := m.files[name]
+       m.mu.Unlock()
        if !ok {
                return nil, fs.ErrNotExist
        }
@@ -83,16 +104,20 @@ func (m *memIO) Open(name string) (iceio.File, error) {
 }
 
 func (m *memIO) Create(name string) (iceio.FileWriter, error) {
-       return &limitedWriteCloser{limit: m.limit, err: m.err}, nil
+       return &limitedWriteCloser{limit: m.limit, err: m.err, parent: m, path: 
name}, nil
 }
 
 func (m *memIO) WriteFile(name string, content []byte) error {
+       m.mu.Lock()
+       defer m.mu.Unlock()
        m.files[name] = append([]byte(nil), content...)
 
        return nil
 }
 
 func (m *memIO) Remove(name string) error {
+       m.mu.Lock()
+       defer m.mu.Unlock()
        delete(m.files, name)
 
        return nil
@@ -883,3 +908,63 @@ func TestFastAppendInheritsZeroCountManifests(t 
*testing.T) {
        }
        require.True(t, newManifestFound, "new snapshot must include a manifest 
written by snap2")
 }
+
+// TestComputeOwnManifests_NewTable verifies that when there is no parent
+// snapshot (parentSnapshotID == 0) all manifests are returned as-is with no 
error.
+func TestComputeOwnManifests_NewTable(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       io := newMemIO(1<<20, nil)
+       txn := createTestTransaction(t, io, spec)
+       sp := newFastAppendFilesProducer(OpAppend, txn, io, nil, nil)
+       // parentSnapshotID is 0 by default (new table) — all manifests belong 
to this producer.
+
+       got, err := sp.computeOwnManifests(nil)
+       require.NoError(t, err, "new table: computeOwnManifests must not error")
+       require.Nil(t, got, "new table: returned manifests must equal input")
+}
+
+// TestComputeOwnManifests_SnapshotByIDError verifies that when the parent
+// snapshot cannot be found an error is returned instead of silently claiming
+// all manifests as own.
+func TestComputeOwnManifests_SnapshotByIDError(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       io := newMemIO(1<<20, nil)
+       txn := createTestTransaction(t, io, spec)
+       sp := newFastAppendFilesProducer(OpAppend, txn, io, nil, nil)
+       sp.parentSnapshotID = 9999 // no such snapshot in metadata
+
+       got, err := sp.computeOwnManifests(nil)
+       require.Error(t, err, "unknown parent snapshot ID: must return error, 
not silent fallback")
+       require.ErrorIs(t, err, ErrSnapshotNotFound,
+               "production wraps ErrSnapshotNotFound; pin meaning via 
errors.Is so a regression "+
+                       "that swallows the lookup error and returns a 
programming-bug error would fail this test")
+       require.Nil(t, got, "error path must return nil manifest slice")
+}
+
+// TestComputeOwnManifests_ParentManifestsIOError verifies that when the parent
+// snapshot exists but its manifest list file cannot be read an error is 
returned
+// instead of silently claiming all manifests as own (which would cause 
duplicates
+// in the rebuilt manifest list).
+func TestComputeOwnManifests_ParentManifestsIOError(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       io := newMemIO(1<<20, nil)
+       txn := createTestTransaction(t, io, spec)
+       sp := newFastAppendFilesProducer(OpAppend, txn, io, nil, nil)
+
+       // Add a snapshot with a manifest list path that does not exist in the 
IO.
+       // parent.Manifests will fail with fs.ErrNotExist when it tries to open 
it.
+       parentID := int64(42)
+       txn.meta.snapshotList = append(txn.meta.snapshotList, Snapshot{
+               SnapshotID:   parentID,
+               ManifestList: 
"mem://default/table-location/metadata/ghost-manifest-list.avro",
+               Summary:      &Summary{Operation: OpAppend},
+       })
+       sp.parentSnapshotID = parentID
+
+       got, err := sp.computeOwnManifests(nil)
+       require.Error(t, err, "IO error reading parent manifests: must return 
error, not silent fallback")
+       require.ErrorIs(t, err, fs.ErrNotExist,
+               "production wraps the underlying IO error; pin meaning via 
errors.Is so a regression that "+
+                       "swallows the IO error and returns a programming-bug 
error would fail this test")
+       require.Nil(t, got, "error path must return nil manifest slice")
+}
diff --git a/table/table.go b/table/table.go
index a8ee06b1..c4c2a5de 100644
--- a/table/table.go
+++ b/table/table.go
@@ -51,16 +51,20 @@ import (
 // catalogs return their conflict errors raw and will not trigger
 // retries until follow-up work wires them through (tracked under
 // issue #830).
-//
-// The retry loop in doCommit re-issues the original updates and
-// requirements unchanged. This recovers only from transient catalog
-// errors (dropped connections, brief 409 during leader election); it
-// does not yet refresh the table metadata between attempts, so a
-// contended commit whose AssertRefSnapshotID requirement has been
-// invalidated by a peer will fail deterministically on every retry.
-// Refresh-and-replay is tracked separately (issue #830).
 var ErrCommitFailed = errors.New("commit failed, refresh and try again")
 
+// ErrWriteIORequired is returned by doCommit when the table's file system
+// does not implement io.WriteFileIO. Manifest-list rebuild on retry requires
+// write access; failing fast here is preferable to silently skipping the
+// rebuild and reintroducing the stale-parent data-loss bug. Callers that
+// need to detect this condition should use errors.Is(err, ErrWriteIORequired).
+var ErrWriteIORequired = errors.New("commit: file system does not implement 
WriteFileIO")
+
+// ErrSnapshotNotFound is returned (wrapped) by metadata lookups and by
+// computeOwnManifests when a snapshot ID does not exist in the table's
+// snapshot list. Tests pin meaning via errors.Is(err, ErrSnapshotNotFound).
+var ErrSnapshotNotFound = errors.New("snapshot not found")
+
 type FSysF func(ctx context.Context) (icebergio.IO, error)
 
 type Identifier = []string
@@ -384,12 +388,38 @@ func (t Table) doCommit(ctx context.Context, updates 
[]Update, reqs []Requiremen
                return nil, err
        }
 
+       // Every real commit-path FS implements WriteFileIO. Failing here is
+       // preferable to silently skipping the manifest-list rebuild inside the
+       // retry loop — a skip reintroduces the original stale-parent data loss.
+       wfs, ok := fs.(icebergio.WriteFileIO)
+       if !ok {
+               return nil, fmt.Errorf("%w: manifest list rebuild requires 
write access", ErrWriteIORequired)
+       }
+
        var (
-               newMeta Metadata
-               newLoc  string
-               timer   *time.Timer
+               newMeta           Metadata
+               newLoc            string
+               timer             *time.Timer
+               orphanedManifests []string // manifest-list files orphaned by 
rebuilds
        )
 
+       // cleanupOrphans controls whether the defer below removes orphaned 
manifest-list
+       // files on exit. It defaults to true (clean on all safe exits) and is 
set to
+       // false only for the one unsafe case: a non-ErrCommitFailed error from
+       // CommitTable, where the catalog may have silently accepted the commit 
and one
+       // of the "orphaned" files may actually be the live snapshot.
+       cleanupOrphans := true
+       defer func() {
+               if !cleanupOrphans || len(orphanedManifests) == 0 {
+                       return
+               }
+               for _, path := range orphanedManifests {
+                       if removeErr := wfs.Remove(path); removeErr != nil {
+                               log.Printf("Warning: failed to delete orphaned 
manifest list %s: %v", path, removeErr)
+                       }
+               }
+       }()
+
        // current tracks the catalog state between retries. On attempt 0 it
        // equals t.metadata (so the conflict context's concurrent-snapshot
        // walk is empty and validators short-circuit). On subsequent
@@ -428,6 +458,18 @@ func (t Table) doCommit(ctx context.Context, updates 
[]Update, reqs []Requiremen
                        }
                        current = fresh.metadata
                        reqs = rewriteRefSnapshotRequirements(reqs, co.branch, 
current)
+
+                       // Rebuild snapshot manifest lists to inherit all files 
committed
+                       // by concurrent writers since the snapshot was 
originally built.
+                       // Without this, the new snapshot's manifest list would 
only
+                       // contain its own files and callers scanning the 
current snapshot
+                       // would miss every concurrent writer's data.
+                       rebuiltUpdates, orphaned, rebuildErr := 
rebuildSnapshotUpdates(retryCtx, updates, current, co.branch, wfs, int(attempt))
+                       if rebuildErr != nil {
+                               return nil, fmt.Errorf("rebuild manifest list 
for retry attempt %d: %w", attempt, rebuildErr)
+                       }
+                       orphanedManifests = append(orphanedManifests, 
orphaned...)
+                       updates = rebuiltUpdates
                }
 
                // Pre-flight client-side conflict validation. Producers can
@@ -474,7 +516,11 @@ func (t Table) doCommit(ctx context.Context, updates 
[]Update, reqs []Requiremen
                // Only retry on retryable commit conflicts. Unknown-state 
errors
                // (5xx, gateway timeouts) must NOT be retried because the 
commit
                // may have actually succeeded — retrying could duplicate work.
+               // Suppress orphan cleanup for the same reason: one of the 
orphaned
+               // manifest-list files may actually be the snapshot the catalog 
accepted.
                if !errors.Is(err, ErrCommitFailed) {
+                       cleanupOrphans = false
+
                        return nil, err
                }
        }
@@ -526,6 +572,66 @@ func rewriteRefSnapshotRequirements(reqs []Requirement, 
branch string, fresh Met
        return out
 }
 
+// rebuildSnapshotUpdates returns a new slice of updates where any
+// addSnapshotUpdate that carries a rebuildManifestList closure has its
+// snapshot regenerated to inherit all data files committed to the branch
+// since the original snapshot was built. Updates without a rebuild closure
+// pass through unchanged.
+//
+// It also returns the manifest-list file paths that were superseded by
+// the rebuild (i.e., the paths from the input updates that were replaced).
+// These become orphaned objects in object storage and should be removed
+// by the caller after a successful commit.
+//
+// This is the manifest-layer "refresh-and-replay" step: the data files
+// (already written to object storage) are reused as-is; only the manifest
+// list is rewritten to include the fresh parent's manifests so that the
+// rebuilt snapshot contains every committed file.
+func rebuildSnapshotUpdates(ctx context.Context, updates []Update, freshMeta 
Metadata, branch string, fs icebergio.WriteFileIO, attempt int) (rebuilt 
[]Update, orphanedPaths []string, err error) {
+       // Determine the fresh branch head to use as the rebuilt snapshot's 
parent.
+       var freshHead *Snapshot
+       if branch != "" && freshMeta != nil {
+               freshHead = freshMeta.SnapshotByName(branch)
+       } else if freshMeta != nil {
+               freshHead = freshMeta.CurrentSnapshot()
+       }
+
+       result := make([]Update, len(updates))
+       copy(result, updates)
+
+       for i, u := range result {
+               su, ok := u.(*addSnapshotUpdate)
+               if !ok || su.rebuildManifestList == nil {
+                       continue
+               }
+
+               // Skip if the parent has not changed — saves an unnecessary S3 
write.
+               if freshHead != nil && su.Snapshot.ParentSnapshotID != nil &&
+                       *su.Snapshot.ParentSnapshotID == freshHead.SnapshotID {
+                       continue
+               }
+
+               oldManifestList := su.Snapshot.ManifestList
+
+               newSnap, rebuildErr := su.rebuildManifestList(ctx, freshMeta, 
freshHead, fs, attempt)
+               if rebuildErr != nil {
+                       return nil, nil, rebuildErr
+               }
+
+               result[i] = &addSnapshotUpdate{
+                       baseUpdate:          su.baseUpdate,
+                       Snapshot:            newSnap,
+                       ownManifests:        su.ownManifests,
+                       rebuildManifestList: su.rebuildManifestList,
+               }
+
+               // The old manifest list is now an orphaned object in object 
storage.
+               orphanedPaths = append(orphanedPaths, oldManifestList)
+       }
+
+       return result, orphanedPaths, nil
+}
+
 type retryConfig struct {
        numRetries     uint
        minWaitMs      uint
diff --git a/table/updates.go b/table/updates.go
index 5dbdfb90..a1596a10 100644
--- a/table/updates.go
+++ b/table/updates.go
@@ -311,6 +311,18 @@ func (u *setDefaultSortOrderUpdate) Apply(builder 
*MetadataBuilder) error {
 type addSnapshotUpdate struct {
        baseUpdate
        Snapshot *Snapshot `json:"snapshot"`
+
+       // ownManifests holds the manifests written by this producer (those
+       // NOT inherited from the parent snapshot). Populated by
+       // snapshotProducer.commit and used by rebuildManifestList below.
+       ownManifests []iceberg.ManifestFile
+
+       // rebuildManifestList, when non-nil, regenerates the snapshot's
+       // manifest list to inherit from freshParent and combines it with
+       // ownManifests. Called by doCommit on every retry attempt so that
+       // each retry snapshot correctly inherits all files committed by
+       // concurrent writers since the original build.
+       rebuildManifestList func(ctx context.Context, freshMeta Metadata, 
freshParent *Snapshot, fio io.WriteFileIO, attempt int) (*Snapshot, error)
 }
 
 // NewAddSnapshotUpdate creates a new update that adds the given snapshot to 
the table metadata.
@@ -321,8 +333,14 @@ func NewAddSnapshotUpdate(snapshot *Snapshot) 
*addSnapshotUpdate {
        }
 }
 
+// Apply records this snapshot in the metadata builder. It delegates to
+// MetadataBuilder.AddSnapshotUpdate so that runtime-only fields
+// (ownManifests and rebuildManifestList) are preserved on the stored update
+// without reaching back into builder.updates after the fact. doCommit's retry
+// loop relies on these fields to regenerate the manifest list after an OCC
+// conflict.
 func (u *addSnapshotUpdate) Apply(builder *MetadataBuilder) error {
-       return builder.AddSnapshot(u.Snapshot)
+       return builder.AddSnapshotUpdate(u)
 }
 
 type setSnapshotRefUpdate struct {


Reply via email to