mzzz-zzm commented on code in PR #982:
URL: https://github.com/apache/iceberg-go/pull/982#discussion_r3203217216


##########
table/rebuild_manifest_test.go:
##########
@@ -0,0 +1,476 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "context"
+       "errors"
+       "testing"
+
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+// newMetadataWithLastSeqNum builds a v2 Metadata whose last-sequence-number
+// equals lastSeqNum by grafting a synthetic snapshot at that sequence number.
+// Used by tests that need to control freshMeta.LastSequenceNumber().
+func newMetadataWithLastSeqNum(t *testing.T, lastSeqNum int64) Metadata {
+       t.Helper()
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+       )
+       meta, err := NewMetadata(schema, iceberg.UnpartitionedSpec, 
UnsortedSortOrder, "file:///tmp/seqtest",
+               iceberg.Properties{PropertyFormatVersion: "2"})
+       require.NoError(t, err)
+       builder, err := MetadataBuilderFromBase(meta, "")
+       require.NoError(t, err)
+       snap := Snapshot{
+               SnapshotID:     1000,
+               SequenceNumber: lastSeqNum,
+               TimestampMs:    meta.LastUpdatedMillis() + 1,
+               Summary:        &Summary{Operation: OpAppend},
+       }
+       require.NoError(t, builder.AddSnapshot(&snap))
+       require.NoError(t, builder.SetSnapshotRef(MainBranch, 1000, BranchRef))
+       out, err := builder.Build()
+       require.NoError(t, err)
+
+       return out
+}
+
+// newV3MetadataWithNextRowID builds a v3 Metadata whose NextRowID() returns
+// nextRowID by adding a synthetic snapshot that consumes that many rows.
+// Used by tests that need to control freshMeta.NextRowID() for v3 row lineage.
+func newV3MetadataWithNextRowID(t *testing.T, nextRowID int64) Metadata {
+       t.Helper()
+       spec := iceberg.NewPartitionSpec()
+       txn, _ := createTestTransactionWithMemIO(t, spec)
+       txn.meta.formatVersion = 3
+
+       firstRowID := int64(0)
+       addedRows := nextRowID
+       snap := Snapshot{
+               SnapshotID:     1000,
+               SequenceNumber: 1,
+               TimestampMs:    txn.meta.base.LastUpdatedMillis() + 1,
+               Summary:        &Summary{Operation: OpAppend},
+               FirstRowID:     &firstRowID,
+               AddedRows:      &addedRows,
+       }
+       require.NoError(t, txn.meta.AddSnapshot(&snap))
+       require.NoError(t, txn.meta.SetSnapshotRef(MainBranch, 1000, BranchRef))
+
+       meta, err := txn.meta.Build()
+       require.NoError(t, err)
+       require.Equal(t, nextRowID, meta.NextRowID())
+
+       return meta
+}
+
+// ptr returns a pointer to v, used in test helper expressions.
+// NOTE: ptr is also declared in pos_delete_partitioned_fanout_writer_test.go;
+// both files share the same package so only one declaration is needed.
+// This comment documents the shared usage — do not re-add a declaration here.
+
+// rebuildUpdate constructs an addSnapshotUpdate whose rebuildManifestList
+// closure simply records the freshParent it received and returns a new
+// snapshot whose ManifestList is the given newManifestList value.
+func rebuildUpdate(snap *Snapshot, newManifestList string, gotParent 
**Snapshot) *addSnapshotUpdate {
+       return &addSnapshotUpdate{
+               baseUpdate: baseUpdate{ActionName: UpdateAddSnapshot},
+               Snapshot:   snap,
+               rebuildManifestList: func(_ context.Context, _ Metadata, 
freshParent *Snapshot, _ iceio.WriteFileIO, _ int) (*Snapshot, error) {
+                       *gotParent = freshParent
+                       rebuilt := *snap
+                       rebuilt.ManifestList = newManifestList
+
+                       return &rebuilt, nil
+               },
+       }
+}
+
+// TestRebuildSnapshotUpdates_CallsClosureWithFreshParent verifies that
+// rebuildSnapshotUpdates invokes the rebuild closure and passes the fresh
+// branch head as freshParent.
+func TestRebuildSnapshotUpdates_CallsClosureWithFreshParent(t *testing.T) {
+       const oldManifest = "s3://bucket/old-manifest-list.avro"
+       const newManifest = "s3://bucket/new-manifest-list.avro"
+
+       // Build a fresh metadata that has a different snapshot than the one
+       // embedded in the update, so the parent-hasn't-changed guard is
+       // bypassed and the closure must be called.
+       freshHead := int64(42)
+       freshMeta := newConflictTestMetadata(t, &freshHead)
+
+       parentID := int64(7) // original snap's parent — does NOT match 
freshHead (42)
+       snap := &Snapshot{
+               SnapshotID:       99,
+               ParentSnapshotID: &parentID,
+               ManifestList:     oldManifest,
+               Summary:          &Summary{Operation: OpAppend},
+       }
+
+       var receivedParent *Snapshot
+       upd := rebuildUpdate(snap, newManifest, &receivedParent)
+
+       rebuilt, orphaned, err := rebuildSnapshotUpdates(
+               t.Context(),
+               []Update{upd},
+               freshMeta,
+               MainBranch,
+               iceio.LocalFS{},
+               1,
+       )
+       require.NoError(t, err)
+       require.Len(t, rebuilt, 1)
+       require.Len(t, orphaned, 1, "old manifest list must be recorded as 
orphaned")
+
+       // The closure should have been called with the branch's current head.
+       require.NotNil(t, receivedParent)
+       assert.Equal(t, freshHead, receivedParent.SnapshotID)
+
+       // The rebuilt update must carry the new manifest list.
+       addUpd, ok := rebuilt[0].(*addSnapshotUpdate)
+       require.True(t, ok)
+       assert.Equal(t, newManifest, addUpd.Snapshot.ManifestList)
+
+       // The superseded manifest list becomes an orphan.
+       assert.Equal(t, oldManifest, orphaned[0])
+}
+
+// TestRebuildSnapshotUpdates_SkipsWhenParentUnchanged verifies that
+// rebuildSnapshotUpdates skips the rebuild when the update's snapshot
+// already has the fresh branch head as its parent (no-op retry).
+func TestRebuildSnapshotUpdates_SkipsWhenParentUnchanged(t *testing.T) {
+       const manifest = "s3://bucket/manifest-list.avro"
+
+       freshHead := int64(42)
+       freshMeta := newConflictTestMetadata(t, &freshHead)
+
+       // Parent already equals the fresh head — rebuild must be skipped.
+       snap := &Snapshot{
+               SnapshotID:       99,
+               ParentSnapshotID: &freshHead, // same as fresh head
+               ManifestList:     manifest,
+               Summary:          &Summary{Operation: OpAppend},
+       }
+
+       called := false
+       upd := &addSnapshotUpdate{
+               baseUpdate: baseUpdate{ActionName: UpdateAddSnapshot},
+               Snapshot:   snap,
+               rebuildManifestList: func(_ context.Context, _ Metadata, _ 
*Snapshot, _ iceio.WriteFileIO, _ int) (*Snapshot, error) {
+                       called = true
+
+                       return snap, nil
+               },
+       }
+
+       rebuilt, orphaned, err := rebuildSnapshotUpdates(
+               t.Context(),
+               []Update{upd},
+               freshMeta,
+               MainBranch,
+               iceio.LocalFS{},
+               1,
+       )
+       require.NoError(t, err)
+       assert.False(t, called, "rebuild closure must not be called when parent 
is already up-to-date")
+       assert.Empty(t, orphaned, "no orphans when rebuild is skipped")
+       assert.Same(t, upd, rebuilt[0].(*addSnapshotUpdate), "original update 
must pass through unchanged")
+}
+
+// TestRebuildSnapshotUpdates_PassesThroughNonRebuildUpdates verifies that
+// updates without a rebuildManifestList closure are returned unmodified.
+func TestRebuildSnapshotUpdates_PassesThroughNonRebuildUpdates(t *testing.T) {
+       plainUpd := NewAddSnapshotUpdate(&Snapshot{
+               SnapshotID:   1,
+               ManifestList: "s3://bucket/no-rebuild.avro",
+               Summary:      &Summary{Operation: OpAppend},
+       })
+
+       // freshMeta may be nil — the plain update must not be touched.
+       rebuilt, orphaned, err := rebuildSnapshotUpdates(
+               t.Context(),
+               []Update{plainUpd},
+               nil,
+               MainBranch,
+               iceio.LocalFS{},
+               0,
+       )
+       require.NoError(t, err)
+       assert.Empty(t, orphaned)
+       assert.Same(t, plainUpd, rebuilt[0].(*addSnapshotUpdate))
+}
+
+// TestRebuildSnapshotUpdates_PropagatesClosureError verifies that an error
+// returned by the rebuild closure surfaces as the function's return error.
+func TestRebuildSnapshotUpdates_PropagatesClosureError(t *testing.T) {
+       freshHead := int64(5)
+       freshMeta := newConflictTestMetadata(t, &freshHead)
+
+       parent := int64(1)
+       snap := &Snapshot{
+               SnapshotID:       10,
+               ParentSnapshotID: &parent,
+               ManifestList:     "s3://bucket/old.avro",
+               Summary:          &Summary{Operation: OpAppend},
+       }
+       wantErr := errors.New("simulated S3 write failure")
+       upd := &addSnapshotUpdate{
+               baseUpdate: baseUpdate{ActionName: UpdateAddSnapshot},
+               Snapshot:   snap,
+               rebuildManifestList: func(_ context.Context, _ Metadata, _ 
*Snapshot, _ iceio.WriteFileIO, _ int) (*Snapshot, error) {
+                       return nil, wantErr
+               },
+       }
+
+       _, _, err := rebuildSnapshotUpdates(t.Context(), []Update{upd}, 
freshMeta, MainBranch, iceio.LocalFS{}, 1)
+       assert.ErrorIs(t, err, wantErr)
+}
+
+// ---------------------------------------------------------------------------
+// Fix 3 — newSeq derived from freshMeta.LastSequenceNumber()
+// ---------------------------------------------------------------------------
+
+// TestRebuildFn_SeqNumDerivedFromFreshMeta verifies that the rebuilt 
snapshot's
+// SequenceNumber equals freshMeta.LastSequenceNumber()+1, NOT
+// freshParent.SequenceNumber+1. A concurrent writer on a different branch can
+// advance the table-wide last-sequence-number without advancing this branch's
+// parent, so using freshParent.SequenceNumber+1 would violate the spec 
invariant.
+func TestRebuildFn_SeqNumDerivedFromFreshMeta(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       txn, wfs := createTestTransactionWithMemIO(t, spec)
+
+       sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
+       sp.appendDataFile(newTestDataFile(t, spec, 
"mem://default/table-location/data/f.parquet", nil))
+
+       updates, _, err := sp.commit(context.Background())
+       require.NoError(t, err)
+       addSnap := updates[0].(*addSnapshotUpdate)
+       require.NotNil(t, addSnap.rebuildManifestList, "rebuildManifestList 
closure must be set")
+
+       // Simulate a concurrent writer on another branch that bumped the global
+       // last-sequence-number to 99 without advancing this branch's parent.
+       // old code: newSeq = capturedSnapshot.SequenceNumber (stale, ≤ 99 — 
spec violation)
+       // new code: newSeq = freshMeta.LastSequenceNumber() + 1 = 100
+       freshMeta := newMetadataWithLastSeqNum(t, 99)
+       require.Equal(t, int64(99), freshMeta.LastSequenceNumber())
+
+       rebuilt, err := addSnap.rebuildManifestList(context.Background(), 
freshMeta, nil, wfs, 1)
+       require.NoError(t, err)
+       require.Equal(t, int64(100), rebuilt.SequenceNumber,
+               "SequenceNumber must be freshMeta.LastSequenceNumber()+1; using 
freshParent.SequenceNumber+1 violates the spec when another branch advanced the 
global counter")
+}
+
+// TestRebuildFn_SeqNumV1TableIsZero verifies that v1 tables keep 
SequenceNumber == 0
+// regardless of what freshMeta reports (v1 does not use sequence numbers).
+func TestRebuildFn_SeqNumV1TableIsZero(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       txn, wfs := createTestTransactionWithMemIO(t, spec)
+       txn.meta.formatVersion = 1 // override to v1
+
+       sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
+       sp.appendDataFile(newTestDataFile(t, spec, 
"mem://default/table-location/data/f.parquet", nil))
+
+       updates, _, err := sp.commit(context.Background())
+       require.NoError(t, err)
+       addSnap := updates[0].(*addSnapshotUpdate)
+       require.NotNil(t, addSnap.rebuildManifestList)
+
+       freshMeta := newMetadataWithLastSeqNum(t, 99)
+       rebuilt, err := addSnap.rebuildManifestList(context.Background(), 
freshMeta, nil, wfs, 1)
+       require.NoError(t, err)
+       require.Equal(t, int64(0), rebuilt.SequenceNumber, "v1 tables must 
always have SequenceNumber == 0")
+}
+
+// ---------------------------------------------------------------------------
+// Fix 1 — firstRowID derived from freshMeta.NextRowID()
+// ---------------------------------------------------------------------------
+
+// TestRebuildFn_V3FirstRowIDDerivedFromFreshMeta verifies that on a v3 table
+// the rebuilt snapshot's FirstRowID equals freshMeta.NextRowID() rather than
+// the hardcoded 0. If two writers race and the peer commits first, the
+// catalog's nextRowID has already advanced; using 0 would produce a
+// first-row-id that disagrees with the catalog's view and fails row-lineage
+// validation.
+func TestRebuildFn_V3FirstRowIDDerivedFromFreshMeta(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       txn, wfs := createTestTransactionWithMemIO(t, spec)
+       txn.meta.formatVersion = 3
+
+       sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
+       sp.appendDataFile(newTestDataFile(t, spec, 
"mem://default/table-location/data/f.parquet", nil))
+
+       updates, _, err := sp.commit(context.Background())
+       require.NoError(t, err)
+       addSnap := updates[0].(*addSnapshotUpdate)
+       require.NotNil(t, addSnap.rebuildManifestList)
+
+       // Simulate a concurrent writer that added 50 rows, advancing nextRowID 
to 50.
+       // old code: firstRowID = 0 (hardcoded)
+       // new code: firstRowID = freshMeta.NextRowID() = 50
+       freshMeta := newV3MetadataWithNextRowID(t, 50)
+       require.Equal(t, int64(50), freshMeta.NextRowID(), 
"freshMeta.NextRowID() must be 50")
+
+       rebuilt, err := addSnap.rebuildManifestList(context.Background(), 
freshMeta, nil, wfs, 1)
+       require.NoError(t, err)
+       require.NotNil(t, rebuilt.FirstRowID, "v3 rebuilt snapshot must have 
FirstRowID")
+       require.Equal(t, int64(50), *rebuilt.FirstRowID,
+               "FirstRowID must equal freshMeta.NextRowID(); hardcoded 0 would 
conflict with catalog's advanced nextRowID")
+
+       // AddedRows must be recomputed from writer.NextRowID() - firstRowID for
+       // this attempt (NOT carried over from capturedSnapshot at attempt 0).
+       // The single test data file has record count 1, so the producer's own
+       // contribution this attempt is exactly 1 row.
+       require.NotNil(t, rebuilt.AddedRows, "v3 rebuilt snapshot must have 
AddedRows")
+       require.Equal(t, int64(1), *rebuilt.AddedRows,

Review Comment:
   `TestRebuildFn_V3FirstRowIDDerivedFromFreshMeta` and 
`TestRebuildFn_V3FirstRowIDZeroWhenNilNextRowID` now mutate 
`capturedSnapshot.AddedRows` to a sentinel that **cannot** equal a correct 
recomputation:
   
   ```go
   require.NotNil(t, addSnap.Snapshot.AddedRows, "captured snapshot must have 
AddedRows on v3")
   sentinel := int64(999_999) // 888_888 in the NextRowID==0 variant
   *addSnap.Snapshot.AddedRows = sentinel
   
   // ... run rebuild ...
   
   require.NotEqual(t, sentinel, *rebuilt.AddedRows,
       "AddedRows must be recomputed; carrying the captured value (mutated to 
sentinel) would be a regression")
   require.Equal(t, int64(1), *rebuilt.AddedRows,
       "AddedRows must equal writer.NextRowID()-firstRowID (this producer's 1 
data row)")
   ```
   
   A regression that reverts the closure to propagate 
`capturedSnapshot.AddedRows` instead of recomputing from `writer.NextRowID() - 
firstRowID` would fail `NotEqual(sentinel)`. Verified by reverting the 
recompute path locally — both tests fail as expected.



##########
table/rebuild_manifest_test.go:
##########
@@ -0,0 +1,476 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "context"
+       "errors"
+       "testing"
+
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+// newMetadataWithLastSeqNum builds a v2 Metadata whose last-sequence-number
+// equals lastSeqNum by grafting a synthetic snapshot at that sequence number.
+// Used by tests that need to control freshMeta.LastSequenceNumber().
+func newMetadataWithLastSeqNum(t *testing.T, lastSeqNum int64) Metadata {
+       t.Helper()
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+       )
+       meta, err := NewMetadata(schema, iceberg.UnpartitionedSpec, 
UnsortedSortOrder, "file:///tmp/seqtest",
+               iceberg.Properties{PropertyFormatVersion: "2"})
+       require.NoError(t, err)
+       builder, err := MetadataBuilderFromBase(meta, "")
+       require.NoError(t, err)
+       snap := Snapshot{
+               SnapshotID:     1000,
+               SequenceNumber: lastSeqNum,
+               TimestampMs:    meta.LastUpdatedMillis() + 1,
+               Summary:        &Summary{Operation: OpAppend},
+       }
+       require.NoError(t, builder.AddSnapshot(&snap))
+       require.NoError(t, builder.SetSnapshotRef(MainBranch, 1000, BranchRef))
+       out, err := builder.Build()
+       require.NoError(t, err)
+
+       return out
+}
+
+// newV3MetadataWithNextRowID builds a v3 Metadata whose NextRowID() returns
+// nextRowID by adding a synthetic snapshot that consumes that many rows.
+// Used by tests that need to control freshMeta.NextRowID() for v3 row lineage.
+func newV3MetadataWithNextRowID(t *testing.T, nextRowID int64) Metadata {
+       t.Helper()
+       spec := iceberg.NewPartitionSpec()
+       txn, _ := createTestTransactionWithMemIO(t, spec)
+       txn.meta.formatVersion = 3
+
+       firstRowID := int64(0)
+       addedRows := nextRowID
+       snap := Snapshot{
+               SnapshotID:     1000,
+               SequenceNumber: 1,
+               TimestampMs:    txn.meta.base.LastUpdatedMillis() + 1,
+               Summary:        &Summary{Operation: OpAppend},
+               FirstRowID:     &firstRowID,
+               AddedRows:      &addedRows,
+       }
+       require.NoError(t, txn.meta.AddSnapshot(&snap))
+       require.NoError(t, txn.meta.SetSnapshotRef(MainBranch, 1000, BranchRef))
+
+       meta, err := txn.meta.Build()
+       require.NoError(t, err)
+       require.Equal(t, nextRowID, meta.NextRowID())
+
+       return meta
+}
+
+// ptr returns a pointer to v, used in test helper expressions.
+// NOTE: ptr is also declared in pos_delete_partitioned_fanout_writer_test.go;
+// both files share the same package so only one declaration is needed.
+// This comment documents the shared usage — do not re-add a declaration here.
+
+// rebuildUpdate constructs an addSnapshotUpdate whose rebuildManifestList
+// closure simply records the freshParent it received and returns a new
+// snapshot whose ManifestList is the given newManifestList value.
+func rebuildUpdate(snap *Snapshot, newManifestList string, gotParent 
**Snapshot) *addSnapshotUpdate {
+       return &addSnapshotUpdate{
+               baseUpdate: baseUpdate{ActionName: UpdateAddSnapshot},
+               Snapshot:   snap,
+               rebuildManifestList: func(_ context.Context, _ Metadata, 
freshParent *Snapshot, _ iceio.WriteFileIO, _ int) (*Snapshot, error) {
+                       *gotParent = freshParent
+                       rebuilt := *snap
+                       rebuilt.ManifestList = newManifestList
+
+                       return &rebuilt, nil
+               },
+       }
+}
+
+// TestRebuildSnapshotUpdates_CallsClosureWithFreshParent verifies that
+// rebuildSnapshotUpdates invokes the rebuild closure and passes the fresh
+// branch head as freshParent.
+func TestRebuildSnapshotUpdates_CallsClosureWithFreshParent(t *testing.T) {
+       const oldManifest = "s3://bucket/old-manifest-list.avro"
+       const newManifest = "s3://bucket/new-manifest-list.avro"
+
+       // Build a fresh metadata that has a different snapshot than the one
+       // embedded in the update, so the parent-hasn't-changed guard is
+       // bypassed and the closure must be called.
+       freshHead := int64(42)
+       freshMeta := newConflictTestMetadata(t, &freshHead)
+
+       parentID := int64(7) // original snap's parent — does NOT match 
freshHead (42)
+       snap := &Snapshot{
+               SnapshotID:       99,
+               ParentSnapshotID: &parentID,
+               ManifestList:     oldManifest,
+               Summary:          &Summary{Operation: OpAppend},
+       }
+
+       var receivedParent *Snapshot
+       upd := rebuildUpdate(snap, newManifest, &receivedParent)
+
+       rebuilt, orphaned, err := rebuildSnapshotUpdates(
+               t.Context(),
+               []Update{upd},
+               freshMeta,
+               MainBranch,
+               iceio.LocalFS{},
+               1,
+       )
+       require.NoError(t, err)
+       require.Len(t, rebuilt, 1)
+       require.Len(t, orphaned, 1, "old manifest list must be recorded as 
orphaned")
+
+       // The closure should have been called with the branch's current head.
+       require.NotNil(t, receivedParent)
+       assert.Equal(t, freshHead, receivedParent.SnapshotID)
+
+       // The rebuilt update must carry the new manifest list.
+       addUpd, ok := rebuilt[0].(*addSnapshotUpdate)
+       require.True(t, ok)
+       assert.Equal(t, newManifest, addUpd.Snapshot.ManifestList)
+
+       // The superseded manifest list becomes an orphan.
+       assert.Equal(t, oldManifest, orphaned[0])
+}
+
+// TestRebuildSnapshotUpdates_SkipsWhenParentUnchanged verifies that
+// rebuildSnapshotUpdates skips the rebuild when the update's snapshot
+// already has the fresh branch head as its parent (no-op retry).
+func TestRebuildSnapshotUpdates_SkipsWhenParentUnchanged(t *testing.T) {
+       const manifest = "s3://bucket/manifest-list.avro"
+
+       freshHead := int64(42)
+       freshMeta := newConflictTestMetadata(t, &freshHead)
+
+       // Parent already equals the fresh head — rebuild must be skipped.
+       snap := &Snapshot{
+               SnapshotID:       99,
+               ParentSnapshotID: &freshHead, // same as fresh head
+               ManifestList:     manifest,
+               Summary:          &Summary{Operation: OpAppend},
+       }
+
+       called := false
+       upd := &addSnapshotUpdate{
+               baseUpdate: baseUpdate{ActionName: UpdateAddSnapshot},
+               Snapshot:   snap,
+               rebuildManifestList: func(_ context.Context, _ Metadata, _ 
*Snapshot, _ iceio.WriteFileIO, _ int) (*Snapshot, error) {
+                       called = true
+
+                       return snap, nil
+               },
+       }
+
+       rebuilt, orphaned, err := rebuildSnapshotUpdates(
+               t.Context(),
+               []Update{upd},
+               freshMeta,
+               MainBranch,
+               iceio.LocalFS{},
+               1,
+       )
+       require.NoError(t, err)
+       assert.False(t, called, "rebuild closure must not be called when parent 
is already up-to-date")
+       assert.Empty(t, orphaned, "no orphans when rebuild is skipped")
+       assert.Same(t, upd, rebuilt[0].(*addSnapshotUpdate), "original update 
must pass through unchanged")
+}
+
+// TestRebuildSnapshotUpdates_PassesThroughNonRebuildUpdates verifies that
+// updates without a rebuildManifestList closure are returned unmodified.
+func TestRebuildSnapshotUpdates_PassesThroughNonRebuildUpdates(t *testing.T) {
+       plainUpd := NewAddSnapshotUpdate(&Snapshot{
+               SnapshotID:   1,
+               ManifestList: "s3://bucket/no-rebuild.avro",
+               Summary:      &Summary{Operation: OpAppend},
+       })
+
+       // freshMeta may be nil — the plain update must not be touched.
+       rebuilt, orphaned, err := rebuildSnapshotUpdates(
+               t.Context(),
+               []Update{plainUpd},
+               nil,
+               MainBranch,
+               iceio.LocalFS{},
+               0,
+       )
+       require.NoError(t, err)
+       assert.Empty(t, orphaned)
+       assert.Same(t, plainUpd, rebuilt[0].(*addSnapshotUpdate))
+}
+
+// TestRebuildSnapshotUpdates_PropagatesClosureError verifies that an error
+// returned by the rebuild closure surfaces as the function's return error.
+func TestRebuildSnapshotUpdates_PropagatesClosureError(t *testing.T) {
+       freshHead := int64(5)
+       freshMeta := newConflictTestMetadata(t, &freshHead)
+
+       parent := int64(1)
+       snap := &Snapshot{
+               SnapshotID:       10,
+               ParentSnapshotID: &parent,
+               ManifestList:     "s3://bucket/old.avro",
+               Summary:          &Summary{Operation: OpAppend},
+       }
+       wantErr := errors.New("simulated S3 write failure")
+       upd := &addSnapshotUpdate{
+               baseUpdate: baseUpdate{ActionName: UpdateAddSnapshot},
+               Snapshot:   snap,
+               rebuildManifestList: func(_ context.Context, _ Metadata, _ 
*Snapshot, _ iceio.WriteFileIO, _ int) (*Snapshot, error) {
+                       return nil, wantErr
+               },
+       }
+
+       _, _, err := rebuildSnapshotUpdates(t.Context(), []Update{upd}, 
freshMeta, MainBranch, iceio.LocalFS{}, 1)
+       assert.ErrorIs(t, err, wantErr)
+}
+
+// ---------------------------------------------------------------------------
+// Fix 3 — newSeq derived from freshMeta.LastSequenceNumber()
+// ---------------------------------------------------------------------------
+
+// TestRebuildFn_SeqNumDerivedFromFreshMeta verifies that the rebuilt 
snapshot's
+// SequenceNumber equals freshMeta.LastSequenceNumber()+1, NOT
+// freshParent.SequenceNumber+1. A concurrent writer on a different branch can
+// advance the table-wide last-sequence-number without advancing this branch's
+// parent, so using freshParent.SequenceNumber+1 would violate the spec 
invariant.
+func TestRebuildFn_SeqNumDerivedFromFreshMeta(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       txn, wfs := createTestTransactionWithMemIO(t, spec)
+
+       sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
+       sp.appendDataFile(newTestDataFile(t, spec, 
"mem://default/table-location/data/f.parquet", nil))
+
+       updates, _, err := sp.commit(context.Background())
+       require.NoError(t, err)
+       addSnap := updates[0].(*addSnapshotUpdate)
+       require.NotNil(t, addSnap.rebuildManifestList, "rebuildManifestList 
closure must be set")
+
+       // Simulate a concurrent writer on another branch that bumped the global
+       // last-sequence-number to 99 without advancing this branch's parent.
+       // old code: newSeq = capturedSnapshot.SequenceNumber (stale, ≤ 99 — 
spec violation)
+       // new code: newSeq = freshMeta.LastSequenceNumber() + 1 = 100
+       freshMeta := newMetadataWithLastSeqNum(t, 99)
+       require.Equal(t, int64(99), freshMeta.LastSequenceNumber())
+
+       rebuilt, err := addSnap.rebuildManifestList(context.Background(), 
freshMeta, nil, wfs, 1)

Review Comment:
   Fixed exactly as suggested (one-line strengthening of the existing summary 
test).
   
   `TestRebuildFn_SummaryRebasedAgainstFreshParent` already wired both a 
non-nil `freshParent` and a `freshMeta`. The fixture now sets distinct values 
so the seq-num source is observable, and a single assertion pins it:
   
   ```go
   freshParent := &Snapshot{
       SnapshotID:     freshParentID,
       SequenceNumber: 99,                       // would yield 
rebuilt.SequenceNumber == 100 under the buggy path
       ManifestList:   parentManifestListPath,
       Summary:        &Summary{Operation: OpAppend, Properties: 
freshParentSummary},
   }
   freshMeta := newMetadataWithLastSeqNum(t, 1)  // correct path yields 
rebuilt.SequenceNumber == 2
   require.NotEqual(t, freshParent.SequenceNumber, 
freshMeta.LastSequenceNumber(),
       "test fixture must keep these distinct so the seq-num source is 
observable")
   
   // ... run rebuild ...
   
   require.Equal(t, freshMeta.LastSequenceNumber()+1, rebuilt.SequenceNumber,
       "newSeq must derive from freshMeta.LastSequenceNumber(), not 
freshParent.SequenceNumber")
   ```
   
   `TestRebuildFn_SeqNumDerivedFromFreshMeta` (the `freshParent=nil` case) is 
kept unchanged — it remains the v2 nil-parent regression coverage.



##########
table/commit_retry_test.go:
##########
@@ -266,3 +307,244 @@ func TestReadRetryConfig_ClampsNegativeProperties(t 
*testing.T) {
        assert.Equal(t, uint(CommitMaxRetryWaitMsDefault), cfg.maxWaitMs)
        assert.Equal(t, uint(CommitTotalRetryTimeoutMsDefault), 
cfg.totalTimeoutMs)
 }
+
+// ---------------------------------------------------------------------------
+// Fix 5 — mandatory WriteFileIO check at top of doCommit
+// ---------------------------------------------------------------------------
+
+// TestDoCommit_NonWriteFileIOReturnsError verifies that doCommit fails
+// immediately when the table's file system does not implement WriteFileIO.
+// A silent skip would reuse the stale manifest list — exactly the bug
+// this PR was designed to fix.
+func TestDoCommit_NonWriteFileIOReturnsError(t *testing.T) {
+       cat := &flakyCatalog{}
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+       )
+       meta, err := NewMetadata(schema, iceberg.UnpartitionedSpec, 
UnsortedSortOrder, "file:///tmp/rotest",
+               iceberg.Properties{PropertyFormatVersion: "2"})
+       require.NoError(t, err)
+       cat.metadata = meta
+
+       tbl := New(
+               Identifier{"db", "ro-test"},
+               meta, "file:///tmp/rotest/v1.metadata.json",
+               func(context.Context) (iceio.IO, error) { return readOnlyIO{}, 
nil },
+               cat,
+       )
+
+       _, doErr := tbl.doCommit(t.Context(), nil, nil)
+       require.Error(t, doErr, "doCommit must fail when FS does not implement 
WriteFileIO")
+       assert.Contains(t, doErr.Error(), "WriteFileIO",
+               "error message must mention WriteFileIO so callers understand 
the requirement")
+       assert.Equal(t, int32(0), cat.attempts.Load(),
+               "CommitTable must not be called when FS check fails")
+}
+
+// ---------------------------------------------------------------------------
+// Fix 6 — orphan cleanup via defer
+// ---------------------------------------------------------------------------
+
+// newOCCTable creates a table that uses the given wfs for its FS and the given
+// catalog for commits. meta should include retry-config properties so that
+// doCommit's retry loop allows at least one retry.
+func newOCCTable(t *testing.T, meta Metadata, wfs iceio.WriteFileIO, cat 
CatalogIO) *Table {
+       t.Helper()
+
+       return New(
+               Identifier{"db", "occ-cleanup-test"},
+               meta,
+               "mem://default/table-location/metadata/v1.metadata.json",
+               func(context.Context) (iceio.IO, error) { return wfs, nil },
+               cat,
+       )
+}
+
+// newMemIOWithRetryMeta creates a test memIO and a matching table Metadata 
that
+// includes retry-config properties, so doCommit's retry loop allows retries.
+// The location matches createTestTransactionWithMemIO so they share the same
+// memIO for writing manifest files.
+func newMemIOWithRetryMeta(t *testing.T, spec iceberg.PartitionSpec) (*memIO, 
Metadata) {
+       t.Helper()
+       wfs := newMemIO(1<<20, nil)
+       schema := simpleSchema()
+       meta, err := NewMetadata(schema, &spec, UnsortedSortOrder, 
"mem://default/table-location",
+               iceberg.Properties{
+                       CommitNumRetriesKey:          "3",
+                       CommitMinRetryWaitMsKey:      "1",
+                       CommitMaxRetryWaitMsKey:      "2",
+                       CommitTotalRetryTimeoutMsKey: "60000",
+               })
+       require.NoError(t, err, "new metadata")
+
+       return wfs, meta
+}
+
+// TestDoCommit_OrphanCleanedOnSuccess verifies that manifest-list files
+// orphaned by OCC retries are removed after a successful commit. These files
+// are written during rebuild and must not leak on the happy path.
+func TestDoCommit_OrphanCleanedOnSuccess(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       wfs, meta := newMemIOWithRetryMeta(t, spec)
+
+       // Build a transaction from the retry-enabled meta and commit via the 
producer.
+       tbl := newOCCTable(t, meta, wfs, nil)
+       txn := tbl.NewTransaction()
+       sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
+       sp.appendDataFile(newTestDataFile(t, spec, 
"mem://default/table-location/data/f.parquet", nil))
+
+       updates, reqs, err := sp.commit(context.Background())
+       require.NoError(t, err)
+       addSnap := updates[0].(*addSnapshotUpdate)
+       originalManifestList := addSnap.Snapshot.ManifestList
+
+       // Pre-populate the original manifest list path in the IO so that the
+       // defer's wfs.Remove call has a concrete entry to delete.
+       wfs.files[originalManifestList] = []byte("placeholder")
+
+       // Catalog: fail once with ErrCommitFailed (triggers rebuild that 
orphans
+       // originalManifestList), then succeed.
+       cat := &sequentialCatalog{
+               metadata: meta,
+               errs:     []error{ErrCommitFailed},
+       }
+       tbl = newOCCTable(t, meta, wfs, cat)
+
+       _, err = tbl.doCommit(t.Context(), updates, reqs, 
withCommitBranch(MainBranch))
+       require.NoError(t, err, "doCommit must succeed on the second attempt")
+
+       _, stillExists := wfs.files[originalManifestList]

Review Comment:
   `limitedWriteCloser.Close()` now persists its buffered payload back into the 
parent `memIO.files` under the existing mutex, so the rebuilt manifest list 
path is genuinely present in `wfs.files` after `doCommit` runs (no more 
pre-population placeholder). All four `TestDoCommit_Orphan*` tests now assert 
the real contract:
   
   ```go
   // Orphan tests now do BOTH:
   //   (a) require the original ML was actually persisted before doCommit 
(real flow, not placeholder)
   //   (b) prove the live committed path was preserved
   require.Contains(t, wfs.files, originalManifestList,
       "producer.commit() must persist the manifest list via WriteFileIO before 
doCommit runs")
   
   // after doCommit success:
   committedSnap := cat.metadata.CurrentSnapshot()
   committedManifestList := committedSnap.ManifestList
   require.NotEqual(t, originalManifestList, committedManifestList,
       "committed manifest list must be the rebuilt path, not the original")
   require.Contains(t, wfs.files, committedManifestList,
       "live committed manifest list must be preserved by the cleanup defer")
   ```
   
   For `TestDoCommit_OrphanNotCleanedOnUnknownError` the assertion is 
symmetrical: ≥2 distinct manifest-list entries remain in `wfs.files` so callers 
can recover whichever the catalog silently accepted. A regression that flipped 
the cleanup loop to delete the committed path instead of the orphan now fails 
(b).



##########
table/commit_retry_test.go:
##########
@@ -266,3 +307,244 @@ func TestReadRetryConfig_ClampsNegativeProperties(t 
*testing.T) {
        assert.Equal(t, uint(CommitMaxRetryWaitMsDefault), cfg.maxWaitMs)
        assert.Equal(t, uint(CommitTotalRetryTimeoutMsDefault), 
cfg.totalTimeoutMs)
 }
+
+// ---------------------------------------------------------------------------
+// Fix 5 — mandatory WriteFileIO check at top of doCommit
+// ---------------------------------------------------------------------------
+
+// TestDoCommit_NonWriteFileIOReturnsError verifies that doCommit fails
+// immediately when the table's file system does not implement WriteFileIO.
+// A silent skip would reuse the stale manifest list — exactly the bug
+// this PR was designed to fix.
+func TestDoCommit_NonWriteFileIOReturnsError(t *testing.T) {
+       cat := &flakyCatalog{}
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+       )
+       meta, err := NewMetadata(schema, iceberg.UnpartitionedSpec, 
UnsortedSortOrder, "file:///tmp/rotest",
+               iceberg.Properties{PropertyFormatVersion: "2"})
+       require.NoError(t, err)
+       cat.metadata = meta
+
+       tbl := New(
+               Identifier{"db", "ro-test"},
+               meta, "file:///tmp/rotest/v1.metadata.json",
+               func(context.Context) (iceio.IO, error) { return readOnlyIO{}, 
nil },
+               cat,
+       )
+
+       _, doErr := tbl.doCommit(t.Context(), nil, nil)
+       require.Error(t, doErr, "doCommit must fail when FS does not implement 
WriteFileIO")
+       assert.Contains(t, doErr.Error(), "WriteFileIO",
+               "error message must mention WriteFileIO so callers understand 
the requirement")
+       assert.Equal(t, int32(0), cat.attempts.Load(),
+               "CommitTable must not be called when FS check fails")
+}
+
+// ---------------------------------------------------------------------------
+// Fix 6 — orphan cleanup via defer
+// ---------------------------------------------------------------------------
+
+// newOCCTable creates a table that uses the given wfs for its FS and the given
+// catalog for commits. meta should include retry-config properties so that
+// doCommit's retry loop allows at least one retry.
+func newOCCTable(t *testing.T, meta Metadata, wfs iceio.WriteFileIO, cat 
CatalogIO) *Table {
+       t.Helper()
+
+       return New(
+               Identifier{"db", "occ-cleanup-test"},
+               meta,
+               "mem://default/table-location/metadata/v1.metadata.json",
+               func(context.Context) (iceio.IO, error) { return wfs, nil },
+               cat,
+       )
+}
+
+// newMemIOWithRetryMeta creates a test memIO and a matching table Metadata 
that
+// includes retry-config properties, so doCommit's retry loop allows retries.
+// The location matches createTestTransactionWithMemIO so they share the same
+// memIO for writing manifest files.
+func newMemIOWithRetryMeta(t *testing.T, spec iceberg.PartitionSpec) (*memIO, 
Metadata) {
+       t.Helper()
+       wfs := newMemIO(1<<20, nil)
+       schema := simpleSchema()
+       meta, err := NewMetadata(schema, &spec, UnsortedSortOrder, 
"mem://default/table-location",
+               iceberg.Properties{
+                       CommitNumRetriesKey:          "3",
+                       CommitMinRetryWaitMsKey:      "1",
+                       CommitMaxRetryWaitMsKey:      "2",
+                       CommitTotalRetryTimeoutMsKey: "60000",
+               })
+       require.NoError(t, err, "new metadata")
+
+       return wfs, meta
+}
+
+// TestDoCommit_OrphanCleanedOnSuccess verifies that manifest-list files
+// orphaned by OCC retries are removed after a successful commit. These files
+// are written during rebuild and must not leak on the happy path.
+func TestDoCommit_OrphanCleanedOnSuccess(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       wfs, meta := newMemIOWithRetryMeta(t, spec)
+
+       // Build a transaction from the retry-enabled meta and commit via the 
producer.
+       tbl := newOCCTable(t, meta, wfs, nil)
+       txn := tbl.NewTransaction()
+       sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
+       sp.appendDataFile(newTestDataFile(t, spec, 
"mem://default/table-location/data/f.parquet", nil))
+
+       updates, reqs, err := sp.commit(context.Background())
+       require.NoError(t, err)
+       addSnap := updates[0].(*addSnapshotUpdate)
+       originalManifestList := addSnap.Snapshot.ManifestList
+
+       // Pre-populate the original manifest list path in the IO so that the
+       // defer's wfs.Remove call has a concrete entry to delete.
+       wfs.files[originalManifestList] = []byte("placeholder")
+
+       // Catalog: fail once with ErrCommitFailed (triggers rebuild that 
orphans
+       // originalManifestList), then succeed.
+       cat := &sequentialCatalog{
+               metadata: meta,
+               errs:     []error{ErrCommitFailed},
+       }
+       tbl = newOCCTable(t, meta, wfs, cat)
+
+       _, err = tbl.doCommit(t.Context(), updates, reqs, 
withCommitBranch(MainBranch))
+       require.NoError(t, err, "doCommit must succeed on the second attempt")
+
+       _, stillExists := wfs.files[originalManifestList]
+       assert.False(t, stillExists,
+               "orphaned manifest list must be removed after successful 
commit")
+}
+
+// TestDoCommit_OrphanNotCleanedOnUnknownError verifies that manifest-list
+// files are NOT removed when CommitTable returns an unknown 
non-ErrCommitFailed
+// error (5xx / gateway timeout). In that case the catalog may have silently
+// accepted the commit, meaning one of the "orphaned" files is actually the
+// live snapshot. Deleting it would permanently corrupt the table.
+func TestDoCommit_OrphanNotCleanedOnUnknownError(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       wfs, meta := newMemIOWithRetryMeta(t, spec)
+
+       tbl := newOCCTable(t, meta, wfs, nil)
+       txn := tbl.NewTransaction()
+       sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
+       sp.appendDataFile(newTestDataFile(t, spec, 
"mem://default/table-location/data/f.parquet", nil))
+
+       updates, reqs, err := sp.commit(context.Background())
+       require.NoError(t, err)
+       addSnap := updates[0].(*addSnapshotUpdate)
+       originalManifestList := addSnap.Snapshot.ManifestList
+
+       wfs.files[originalManifestList] = []byte("placeholder")
+
+       unknown5xxErr := errors.New("simulated 5xx: internal server error")
+       // Catalog: fail once (ErrCommitFailed → rebuild → orphan created),
+       // then return a 5xx (non-ErrCommitFailed → cleanupOrphans=false).
+       cat := &sequentialCatalog{
+               metadata: meta,
+               errs:     []error{ErrCommitFailed, unknown5xxErr},
+       }
+       tbl = newOCCTable(t, meta, wfs, cat)
+
+       _, err = tbl.doCommit(t.Context(), updates, reqs, 
withCommitBranch(MainBranch))
+       require.ErrorIs(t, err, unknown5xxErr, "5xx error must propagate")
+
+       _, stillExists := wfs.files[originalManifestList]
+       assert.True(t, stillExists,
+               "orphaned manifest list must NOT be removed when commit outcome 
is unknown (5xx)")
+}
+
+// TestDoCommit_OrphanCleanedOnCommitDiverged verifies that manifest-list files
+// orphaned by rebuild attempts are removed when ErrCommitDiverged is returned
+// by a conflict validator. Diverged commits are terminal (no retry), and since
+// neither of the orphaned files was ever accepted by the catalog, they are 
safe
+// to delete. The defer cleanup runs with cleanupOrphans=true on this path.
+func TestDoCommit_OrphanCleanedOnCommitDiverged(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       wfs, meta := newMemIOWithRetryMeta(t, spec)
+
+       // freshMeta has a snapshot on MainBranch so validators run on the 
retry attempt.
+       freshID := int64(42)
+       freshMeta := newConflictTestMetadataWithProps(t, &freshID, 
iceberg.Properties{
+               CommitNumRetriesKey:          "3",
+               CommitMinRetryWaitMsKey:      "1",
+               CommitMaxRetryWaitMsKey:      "2",
+               CommitTotalRetryTimeoutMsKey: "60000",
+       })
+
+       tbl := newOCCTable(t, meta, wfs, nil)
+       txn := tbl.NewTransaction()
+       sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
+       sp.appendDataFile(newTestDataFile(t, spec, 
"mem://default/table-location/data/f.parquet", nil))
+
+       updates, reqs, err := sp.commit(context.Background())
+       require.NoError(t, err)
+       addSnap := updates[0].(*addSnapshotUpdate)
+       originalManifestList := addSnap.Snapshot.ManifestList
+
+       wfs.files[originalManifestList] = []byte("placeholder")
+
+       // Catalog: fail once (ErrCommitFailed → rebuild → orphan created).
+       // LoadTable returns freshMeta (has branch snapshot → validators run on 
retry).
+       // Validator returns ErrCommitDiverged immediately.
+       cat := &sequentialCatalog{
+               metadata: meta,
+               loadMeta: freshMeta,
+               errs:     []error{ErrCommitFailed},
+       }
+       tbl = newOCCTable(t, meta, wfs, cat)
+
+       divergedValidator := func(*conflictContext) error { return 
ErrCommitDiverged }
+
+       _, err = tbl.doCommit(t.Context(), updates, reqs,
+               withCommitBranch(MainBranch),
+               withCommitValidators(divergedValidator),
+       )
+       require.ErrorIs(t, err, ErrCommitDiverged)
+
+       // The defer must have fired with cleanupOrphans=true and removed the 
orphan.
+       _, stillExists := wfs.files[originalManifestList]
+       assert.False(t, stillExists,
+               "orphaned manifest list must be removed even on 
ErrCommitDiverged: "+
+                       "the file was never accepted by the catalog so it is 
safe to delete")
+}
+
+// TestDoCommit_OrphanCleanedOnRetriesExhausted verifies that when every retry
+// attempt fails with ErrCommitFailed and the loop exits with the budget
+// exhausted, the defer still fires with cleanupOrphans=true. None of the
+// orphaned manifest-list files were ever accepted by the catalog, so they are
+// safe to delete on this terminal exit.
+func TestDoCommit_OrphanCleanedOnRetriesExhausted(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       wfs, meta := newMemIOWithRetryMeta(t, spec)
+
+       tbl := newOCCTable(t, meta, wfs, nil)
+       txn := tbl.NewTransaction()
+       sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
+       sp.appendDataFile(newTestDataFile(t, spec, 
"mem://default/table-location/data/f.parquet", nil))
+
+       updates, reqs, err := sp.commit(context.Background())
+       require.NoError(t, err)
+       addSnap := updates[0].(*addSnapshotUpdate)
+       originalManifestList := addSnap.Snapshot.ManifestList
+
+       wfs.files[originalManifestList] = []byte("placeholder")
+
+       // numRetries=3 → 4 attempts; every attempt fails with ErrCommitFailed.
+       cat := &sequentialCatalog{
+               metadata: meta,
+               errs:     []error{ErrCommitFailed, ErrCommitFailed, 
ErrCommitFailed, ErrCommitFailed},

Review Comment:
   Added `TestDoCommit_RetryProgressesFreshMeta` (in `commit_retry_test.go`) 
that drives a **real producer commit** with `rebuildManifestList` closure 
attached through 2 `ErrCommitFailed` retries + 1 success. The new 
`progressingRebuildCatalog`:
   
   - writes a real (empty) manifest-list file at a unique path on every peer 
graft, so the rebuild closure's `freshParent.Manifests(fio)` succeeds on each 
retry;
   - publishes a `Summary` with cumulative `total-data-files` on each peer, so 
the rebuild's summary recomputation observes an evolving parent;
   - applies the accepted updates through `MetadataBuilder` so the test can 
read the live committed snapshot.
   
   A single test now exercises every Round 1 fix at once:
   
   ```go
   // (a) commitTableCalls: 2 fails + 1 success.
   require.Equal(t, int32(3), cat.commitTableCalls.Load())
   
   // (b) Orphan list grows by N — every retry submits a UNIQUE manifest list 
path.
   seen := make(map[string]struct{}, 3)
   for i, p := range cat.observedManifestLists { /* assert distinct */ }
   require.Equal(t, originalManifestList, cat.observedManifestLists[0])
   
   // (c) freshMeta.LastSequenceNumber() observed at LoadTable strictly 
advances.
   require.Greater(t, cat.loadedLastSeqNums[1], cat.loadedLastSeqNums[0])
   
   // (d) Fix 3 — submitted seq numbers equal freshMeta.LastSequenceNumber()+1 
per retry.
   require.Equal(t, cat.loadedLastSeqNums[0]+1, cat.observedSeqNums[1])
   require.Equal(t, cat.loadedLastSeqNums[1]+1, cat.observedSeqNums[2])
   
   // (e) Fix 2 — summary chains on top of evolving freshParent.
   require.Equal(t, 3, gotDataFiles,
       "committed total-data-files must = peer cumulative (2) + producer (1) = 
3; "+
           "a regression that ignored freshParent.Summary would publish 1")
   
   // (f) Fix 6 — defer cleanup: live committed ML preserved; orphans removed.
   require.Contains(t, wfs.files, committedML)
   require.NotContains(t, wfs.files, originalManifestList)
   require.NotContains(t, wfs.files, cat.observedManifestLists[1])
   ```
   
   Note on helper choice: `headTrackingCatalog` advances metadata only on a 
*successful* `CommitTable`; the ≥2-retry-progression scenario requires the 
catalog state to advance **between failed retries**. 
`progressingRebuildCatalog` adds exactly that capability while keeping the 
validate-and-track invariants minimal.
   
   (An earlier version of this test passed `nil` updates to `doCommit`, which 
never invoked `rebuildSnapshotUpdates` — only the retry-loop refresh path was 
exercised. `e35d2bd` corrects this so the rebuild closure is exercised for real 
on every retry, which is what was actually requested.)



##########
table/commit_retry_test.go:
##########
@@ -266,3 +307,244 @@ func TestReadRetryConfig_ClampsNegativeProperties(t 
*testing.T) {
        assert.Equal(t, uint(CommitMaxRetryWaitMsDefault), cfg.maxWaitMs)
        assert.Equal(t, uint(CommitTotalRetryTimeoutMsDefault), 
cfg.totalTimeoutMs)
 }
+
+// ---------------------------------------------------------------------------
+// Fix 5 — mandatory WriteFileIO check at top of doCommit
+// ---------------------------------------------------------------------------
+
+// TestDoCommit_NonWriteFileIOReturnsError verifies that doCommit fails
+// immediately when the table's file system does not implement WriteFileIO.
+// A silent skip would reuse the stale manifest list — exactly the bug
+// this PR was designed to fix.
+func TestDoCommit_NonWriteFileIOReturnsError(t *testing.T) {
+       cat := &flakyCatalog{}
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+       )
+       meta, err := NewMetadata(schema, iceberg.UnpartitionedSpec, 
UnsortedSortOrder, "file:///tmp/rotest",
+               iceberg.Properties{PropertyFormatVersion: "2"})
+       require.NoError(t, err)
+       cat.metadata = meta
+
+       tbl := New(
+               Identifier{"db", "ro-test"},
+               meta, "file:///tmp/rotest/v1.metadata.json",
+               func(context.Context) (iceio.IO, error) { return readOnlyIO{}, 
nil },
+               cat,
+       )
+
+       _, doErr := tbl.doCommit(t.Context(), nil, nil)
+       require.Error(t, doErr, "doCommit must fail when FS does not implement 
WriteFileIO")
+       assert.Contains(t, doErr.Error(), "WriteFileIO",

Review Comment:
   `table/table.go` now exposes a sentinel:
   
   ```go
   // ErrWriteIORequired is returned by doCommit when the table's file system
   // does not implement icebergio.WriteFileIO. Callers can detect this via
   // errors.Is so the test does not depend on the rendered error message.
   var ErrWriteIORequired = errors.New("commit: file system does not implement 
WriteFileIO")
   
   // ... in doCommit:
   return nil, fmt.Errorf("%w: manifest list rebuild requires write access", 
ErrWriteIORequired)
   ```
   
   `TestDoCommit_NonWriteFileIOReturnsError` uses `require.ErrorIs(t, doErr, 
ErrWriteIORequired)` and the substring assertion is removed. The two 
`TestComputeOwnManifests_*Error` tests received the same treatment (you flagged 
them as the same shape):
   
   ```go
   require.ErrorIs(t, err, ErrSnapshotNotFound,
       "production wraps ErrSnapshotNotFound; pin meaning via errors.Is so a 
regression "+
           "that swallows the lookup error and returns a programming-bug error 
would fail this test")
   
   require.ErrorIs(t, err, fs.ErrNotExist,
       "production wraps the underlying IO error; pin meaning via errors.Is so 
a regression that "+
           "swallows the IO error and returns a programming-bug error would 
fail this test")
   ```
   
   `ErrSnapshotNotFound` is the sentinel already wrapped by 
`MetadataBuilder.SnapshotByID`, so no new sentinel was needed for 
`_SnapshotByIDError`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to