laskoviymishka commented on code in PR #982:
URL: https://github.com/apache/iceberg-go/pull/982#discussion_r3202509210


##########
table/rebuild_manifest_test.go:
##########
@@ -0,0 +1,476 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "context"
+       "errors"
+       "testing"
+
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+// newMetadataWithLastSeqNum builds a v2 Metadata whose last-sequence-number
+// equals lastSeqNum by grafting a synthetic snapshot at that sequence number.
+// Used by tests that need to control freshMeta.LastSequenceNumber().
+func newMetadataWithLastSeqNum(t *testing.T, lastSeqNum int64) Metadata {
+       t.Helper()
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+       )
+       meta, err := NewMetadata(schema, iceberg.UnpartitionedSpec, 
UnsortedSortOrder, "file:///tmp/seqtest",
+               iceberg.Properties{PropertyFormatVersion: "2"})
+       require.NoError(t, err)
+       builder, err := MetadataBuilderFromBase(meta, "")
+       require.NoError(t, err)
+       snap := Snapshot{
+               SnapshotID:     1000,
+               SequenceNumber: lastSeqNum,
+               TimestampMs:    meta.LastUpdatedMillis() + 1,
+               Summary:        &Summary{Operation: OpAppend},
+       }
+       require.NoError(t, builder.AddSnapshot(&snap))
+       require.NoError(t, builder.SetSnapshotRef(MainBranch, 1000, BranchRef))
+       out, err := builder.Build()
+       require.NoError(t, err)
+
+       return out
+}
+
+// newV3MetadataWithNextRowID builds a v3 Metadata whose NextRowID() returns
+// nextRowID by adding a synthetic snapshot that consumes that many rows.
+// Used by tests that need to control freshMeta.NextRowID() for v3 row lineage.
+func newV3MetadataWithNextRowID(t *testing.T, nextRowID int64) Metadata {
+       t.Helper()
+       spec := iceberg.NewPartitionSpec()
+       txn, _ := createTestTransactionWithMemIO(t, spec)
+       txn.meta.formatVersion = 3
+
+       firstRowID := int64(0)
+       addedRows := nextRowID
+       snap := Snapshot{
+               SnapshotID:     1000,
+               SequenceNumber: 1,
+               TimestampMs:    txn.meta.base.LastUpdatedMillis() + 1,
+               Summary:        &Summary{Operation: OpAppend},
+               FirstRowID:     &firstRowID,
+               AddedRows:      &addedRows,
+       }
+       require.NoError(t, txn.meta.AddSnapshot(&snap))
+       require.NoError(t, txn.meta.SetSnapshotRef(MainBranch, 1000, BranchRef))
+
+       meta, err := txn.meta.Build()
+       require.NoError(t, err)
+       require.Equal(t, nextRowID, meta.NextRowID())
+
+       return meta
+}
+
+// ptr returns a pointer to v, used in test helper expressions.
+// NOTE: ptr is also declared in pos_delete_partitioned_fanout_writer_test.go;
+// both files share the same package so only one declaration is needed.
+// This comment documents the shared usage — do not re-add a declaration here.
+
+// rebuildUpdate constructs an addSnapshotUpdate whose rebuildManifestList
+// closure simply records the freshParent it received and returns a new
+// snapshot whose ManifestList is the given newManifestList value.
+func rebuildUpdate(snap *Snapshot, newManifestList string, gotParent 
**Snapshot) *addSnapshotUpdate {
+       return &addSnapshotUpdate{
+               baseUpdate: baseUpdate{ActionName: UpdateAddSnapshot},
+               Snapshot:   snap,
+               rebuildManifestList: func(_ context.Context, _ Metadata, 
freshParent *Snapshot, _ iceio.WriteFileIO, _ int) (*Snapshot, error) {
+                       *gotParent = freshParent
+                       rebuilt := *snap
+                       rebuilt.ManifestList = newManifestList
+
+                       return &rebuilt, nil
+               },
+       }
+}
+
+// TestRebuildSnapshotUpdates_CallsClosureWithFreshParent verifies that
+// rebuildSnapshotUpdates invokes the rebuild closure and passes the fresh
+// branch head as freshParent.
+func TestRebuildSnapshotUpdates_CallsClosureWithFreshParent(t *testing.T) {
+       const oldManifest = "s3://bucket/old-manifest-list.avro"
+       const newManifest = "s3://bucket/new-manifest-list.avro"
+
+       // Build a fresh metadata that has a different snapshot than the one
+       // embedded in the update, so the parent-hasn't-changed guard is
+       // bypassed and the closure must be called.
+       freshHead := int64(42)
+       freshMeta := newConflictTestMetadata(t, &freshHead)
+
+       parentID := int64(7) // original snap's parent — does NOT match 
freshHead (42)
+       snap := &Snapshot{
+               SnapshotID:       99,
+               ParentSnapshotID: &parentID,
+               ManifestList:     oldManifest,
+               Summary:          &Summary{Operation: OpAppend},
+       }
+
+       var receivedParent *Snapshot
+       upd := rebuildUpdate(snap, newManifest, &receivedParent)
+
+       rebuilt, orphaned, err := rebuildSnapshotUpdates(
+               t.Context(),
+               []Update{upd},
+               freshMeta,
+               MainBranch,
+               iceio.LocalFS{},
+               1,
+       )
+       require.NoError(t, err)
+       require.Len(t, rebuilt, 1)
+       require.Len(t, orphaned, 1, "old manifest list must be recorded as 
orphaned")
+
+       // The closure should have been called with the branch's current head.
+       require.NotNil(t, receivedParent)
+       assert.Equal(t, freshHead, receivedParent.SnapshotID)
+
+       // The rebuilt update must carry the new manifest list.
+       addUpd, ok := rebuilt[0].(*addSnapshotUpdate)
+       require.True(t, ok)
+       assert.Equal(t, newManifest, addUpd.Snapshot.ManifestList)
+
+       // The superseded manifest list becomes an orphan.
+       assert.Equal(t, oldManifest, orphaned[0])
+}
+
+// TestRebuildSnapshotUpdates_SkipsWhenParentUnchanged verifies that
+// rebuildSnapshotUpdates skips the rebuild when the update's snapshot
+// already has the fresh branch head as its parent (no-op retry).
+func TestRebuildSnapshotUpdates_SkipsWhenParentUnchanged(t *testing.T) {
+       const manifest = "s3://bucket/manifest-list.avro"
+
+       freshHead := int64(42)
+       freshMeta := newConflictTestMetadata(t, &freshHead)
+
+       // Parent already equals the fresh head — rebuild must be skipped.
+       snap := &Snapshot{
+               SnapshotID:       99,
+               ParentSnapshotID: &freshHead, // same as fresh head
+               ManifestList:     manifest,
+               Summary:          &Summary{Operation: OpAppend},
+       }
+
+       called := false
+       upd := &addSnapshotUpdate{
+               baseUpdate: baseUpdate{ActionName: UpdateAddSnapshot},
+               Snapshot:   snap,
+               rebuildManifestList: func(_ context.Context, _ Metadata, _ 
*Snapshot, _ iceio.WriteFileIO, _ int) (*Snapshot, error) {
+                       called = true
+
+                       return snap, nil
+               },
+       }
+
+       rebuilt, orphaned, err := rebuildSnapshotUpdates(
+               t.Context(),
+               []Update{upd},
+               freshMeta,
+               MainBranch,
+               iceio.LocalFS{},
+               1,
+       )
+       require.NoError(t, err)
+       assert.False(t, called, "rebuild closure must not be called when parent 
is already up-to-date")
+       assert.Empty(t, orphaned, "no orphans when rebuild is skipped")
+       assert.Same(t, upd, rebuilt[0].(*addSnapshotUpdate), "original update 
must pass through unchanged")
+}
+
+// TestRebuildSnapshotUpdates_PassesThroughNonRebuildUpdates verifies that
+// updates without a rebuildManifestList closure are returned unmodified.
+func TestRebuildSnapshotUpdates_PassesThroughNonRebuildUpdates(t *testing.T) {
+       plainUpd := NewAddSnapshotUpdate(&Snapshot{
+               SnapshotID:   1,
+               ManifestList: "s3://bucket/no-rebuild.avro",
+               Summary:      &Summary{Operation: OpAppend},
+       })
+
+       // freshMeta may be nil — the plain update must not be touched.
+       rebuilt, orphaned, err := rebuildSnapshotUpdates(
+               t.Context(),
+               []Update{plainUpd},
+               nil,
+               MainBranch,
+               iceio.LocalFS{},
+               0,
+       )
+       require.NoError(t, err)
+       assert.Empty(t, orphaned)
+       assert.Same(t, plainUpd, rebuilt[0].(*addSnapshotUpdate))
+}
+
+// TestRebuildSnapshotUpdates_PropagatesClosureError verifies that an error
+// returned by the rebuild closure surfaces as the function's return error.
+func TestRebuildSnapshotUpdates_PropagatesClosureError(t *testing.T) {
+       freshHead := int64(5)
+       freshMeta := newConflictTestMetadata(t, &freshHead)
+
+       parent := int64(1)
+       snap := &Snapshot{
+               SnapshotID:       10,
+               ParentSnapshotID: &parent,
+               ManifestList:     "s3://bucket/old.avro",
+               Summary:          &Summary{Operation: OpAppend},
+       }
+       wantErr := errors.New("simulated S3 write failure")
+       upd := &addSnapshotUpdate{
+               baseUpdate: baseUpdate{ActionName: UpdateAddSnapshot},
+               Snapshot:   snap,
+               rebuildManifestList: func(_ context.Context, _ Metadata, _ 
*Snapshot, _ iceio.WriteFileIO, _ int) (*Snapshot, error) {
+                       return nil, wantErr
+               },
+       }
+
+       _, _, err := rebuildSnapshotUpdates(t.Context(), []Update{upd}, 
freshMeta, MainBranch, iceio.LocalFS{}, 1)
+       assert.ErrorIs(t, err, wantErr)
+}
+
+// ---------------------------------------------------------------------------
+// Fix 3 — newSeq derived from freshMeta.LastSequenceNumber()
+// ---------------------------------------------------------------------------
+
+// TestRebuildFn_SeqNumDerivedFromFreshMeta verifies that the rebuilt 
snapshot's
+// SequenceNumber equals freshMeta.LastSequenceNumber()+1, NOT
+// freshParent.SequenceNumber+1. A concurrent writer on a different branch can
+// advance the table-wide last-sequence-number without advancing this branch's
+// parent, so using freshParent.SequenceNumber+1 would violate the spec 
invariant.
+func TestRebuildFn_SeqNumDerivedFromFreshMeta(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       txn, wfs := createTestTransactionWithMemIO(t, spec)
+
+       sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
+       sp.appendDataFile(newTestDataFile(t, spec, 
"mem://default/table-location/data/f.parquet", nil))
+
+       updates, _, err := sp.commit(context.Background())
+       require.NoError(t, err)
+       addSnap := updates[0].(*addSnapshotUpdate)
+       require.NotNil(t, addSnap.rebuildManifestList, "rebuildManifestList 
closure must be set")
+
+       // Simulate a concurrent writer on another branch that bumped the global
+       // last-sequence-number to 99 without advancing this branch's parent.
+       // old code: newSeq = capturedSnapshot.SequenceNumber (stale, ≤ 99 — 
spec violation)
+       // new code: newSeq = freshMeta.LastSequenceNumber() + 1 = 100
+       freshMeta := newMetadataWithLastSeqNum(t, 99)
+       require.Equal(t, int64(99), freshMeta.LastSequenceNumber())
+
+       rebuilt, err := addSnap.rebuildManifestList(context.Background(), 
freshMeta, nil, wfs, 1)
+       require.NoError(t, err)
+       require.Equal(t, int64(100), rebuilt.SequenceNumber,
+               "SequenceNumber must be freshMeta.LastSequenceNumber()+1; using 
freshParent.SequenceNumber+1 violates the spec when another branch advanced the 
global counter")
+}
+
+// TestRebuildFn_SeqNumV1TableIsZero verifies that v1 tables keep 
SequenceNumber == 0
+// regardless of what freshMeta reports (v1 does not use sequence numbers).
+func TestRebuildFn_SeqNumV1TableIsZero(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       txn, wfs := createTestTransactionWithMemIO(t, spec)
+       txn.meta.formatVersion = 1 // override to v1
+
+       sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
+       sp.appendDataFile(newTestDataFile(t, spec, 
"mem://default/table-location/data/f.parquet", nil))
+
+       updates, _, err := sp.commit(context.Background())
+       require.NoError(t, err)
+       addSnap := updates[0].(*addSnapshotUpdate)
+       require.NotNil(t, addSnap.rebuildManifestList)
+
+       freshMeta := newMetadataWithLastSeqNum(t, 99)
+       rebuilt, err := addSnap.rebuildManifestList(context.Background(), 
freshMeta, nil, wfs, 1)
+       require.NoError(t, err)
+       require.Equal(t, int64(0), rebuilt.SequenceNumber, "v1 tables must 
always have SequenceNumber == 0")
+}
+
+// ---------------------------------------------------------------------------
+// Fix 1 — firstRowID derived from freshMeta.NextRowID()
+// ---------------------------------------------------------------------------
+
+// TestRebuildFn_V3FirstRowIDDerivedFromFreshMeta verifies that on a v3 table
+// the rebuilt snapshot's FirstRowID equals freshMeta.NextRowID() rather than
+// the hardcoded 0. If two writers race and the peer commits first, the
+// catalog's nextRowID has already advanced; using 0 would produce a
+// first-row-id that disagrees with the catalog's view and fails row-lineage
+// validation.
+func TestRebuildFn_V3FirstRowIDDerivedFromFreshMeta(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       txn, wfs := createTestTransactionWithMemIO(t, spec)
+       txn.meta.formatVersion = 3
+
+       sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
+       sp.appendDataFile(newTestDataFile(t, spec, 
"mem://default/table-location/data/f.parquet", nil))
+
+       updates, _, err := sp.commit(context.Background())
+       require.NoError(t, err)
+       addSnap := updates[0].(*addSnapshotUpdate)
+       require.NotNil(t, addSnap.rebuildManifestList)
+
+       // Simulate a concurrent writer that added 50 rows, advancing nextRowID 
to 50.
+       // old code: firstRowID = 0 (hardcoded)
+       // new code: firstRowID = freshMeta.NextRowID() = 50
+       freshMeta := newV3MetadataWithNextRowID(t, 50)
+       require.Equal(t, int64(50), freshMeta.NextRowID(), 
"freshMeta.NextRowID() must be 50")
+
+       rebuilt, err := addSnap.rebuildManifestList(context.Background(), 
freshMeta, nil, wfs, 1)
+       require.NoError(t, err)
+       require.NotNil(t, rebuilt.FirstRowID, "v3 rebuilt snapshot must have 
FirstRowID")
+       require.Equal(t, int64(50), *rebuilt.FirstRowID,
+               "FirstRowID must equal freshMeta.NextRowID(); hardcoded 0 would 
conflict with catalog's advanced nextRowID")
+
+       // AddedRows must be recomputed from writer.NextRowID() - firstRowID for
+       // this attempt (NOT carried over from capturedSnapshot at attempt 0).
+       // The single test data file has record count 1, so the producer's own
+       // contribution this attempt is exactly 1 row.
+       require.NotNil(t, rebuilt.AddedRows, "v3 rebuilt snapshot must have 
AddedRows")
+       require.Equal(t, int64(1), *rebuilt.AddedRows,

Review Comment:
   The producer's single data file has record-count 1, so 
`capturedSnapshot.AddedRows == 1` and `*rebuilt.AddedRows == 1` line up by 
accident. A revert of the recompute path (carrying the captured AddedRows over 
instead of deriving from `writer.NextRowID() - firstRowID`) still passes this 
assertion — same shape in `TestRebuildFn_V3FirstRowIDZeroWhenNilNextRowID`.
   
   I'd reshape the fixture so captured ≠ recomputed. Seeding 
`capturedSnapshot.AddedRows` to something distinct (e.g. 99) before the 
rebuild, or using a producer whose data files sum to a different count, makes 
the recompute the only path that passes.



##########
table/rebuild_manifest_test.go:
##########
@@ -0,0 +1,476 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "context"
+       "errors"
+       "testing"
+
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+// newMetadataWithLastSeqNum builds a v2 Metadata whose last-sequence-number
+// equals lastSeqNum by grafting a synthetic snapshot at that sequence number.
+// Used by tests that need to control freshMeta.LastSequenceNumber().
+func newMetadataWithLastSeqNum(t *testing.T, lastSeqNum int64) Metadata {
+       t.Helper()
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+       )
+       meta, err := NewMetadata(schema, iceberg.UnpartitionedSpec, 
UnsortedSortOrder, "file:///tmp/seqtest",
+               iceberg.Properties{PropertyFormatVersion: "2"})
+       require.NoError(t, err)
+       builder, err := MetadataBuilderFromBase(meta, "")
+       require.NoError(t, err)
+       snap := Snapshot{
+               SnapshotID:     1000,
+               SequenceNumber: lastSeqNum,
+               TimestampMs:    meta.LastUpdatedMillis() + 1,
+               Summary:        &Summary{Operation: OpAppend},
+       }
+       require.NoError(t, builder.AddSnapshot(&snap))
+       require.NoError(t, builder.SetSnapshotRef(MainBranch, 1000, BranchRef))
+       out, err := builder.Build()
+       require.NoError(t, err)
+
+       return out
+}
+
+// newV3MetadataWithNextRowID builds a v3 Metadata whose NextRowID() returns
+// nextRowID by adding a synthetic snapshot that consumes that many rows.
+// Used by tests that need to control freshMeta.NextRowID() for v3 row lineage.
+func newV3MetadataWithNextRowID(t *testing.T, nextRowID int64) Metadata {
+       t.Helper()
+       spec := iceberg.NewPartitionSpec()
+       txn, _ := createTestTransactionWithMemIO(t, spec)
+       txn.meta.formatVersion = 3
+
+       firstRowID := int64(0)
+       addedRows := nextRowID
+       snap := Snapshot{
+               SnapshotID:     1000,
+               SequenceNumber: 1,
+               TimestampMs:    txn.meta.base.LastUpdatedMillis() + 1,
+               Summary:        &Summary{Operation: OpAppend},
+               FirstRowID:     &firstRowID,
+               AddedRows:      &addedRows,
+       }
+       require.NoError(t, txn.meta.AddSnapshot(&snap))
+       require.NoError(t, txn.meta.SetSnapshotRef(MainBranch, 1000, BranchRef))
+
+       meta, err := txn.meta.Build()
+       require.NoError(t, err)
+       require.Equal(t, nextRowID, meta.NextRowID())
+
+       return meta
+}
+
+// ptr returns a pointer to v, used in test helper expressions.
+// NOTE: ptr is also declared in pos_delete_partitioned_fanout_writer_test.go;
+// both files share the same package so only one declaration is needed.
+// This comment documents the shared usage — do not re-add a declaration here.
+
+// rebuildUpdate constructs an addSnapshotUpdate whose rebuildManifestList
+// closure simply records the freshParent it received and returns a new
+// snapshot whose ManifestList is the given newManifestList value.
+func rebuildUpdate(snap *Snapshot, newManifestList string, gotParent 
**Snapshot) *addSnapshotUpdate {
+       return &addSnapshotUpdate{
+               baseUpdate: baseUpdate{ActionName: UpdateAddSnapshot},
+               Snapshot:   snap,
+               rebuildManifestList: func(_ context.Context, _ Metadata, 
freshParent *Snapshot, _ iceio.WriteFileIO, _ int) (*Snapshot, error) {
+                       *gotParent = freshParent
+                       rebuilt := *snap
+                       rebuilt.ManifestList = newManifestList
+
+                       return &rebuilt, nil
+               },
+       }
+}
+
+// TestRebuildSnapshotUpdates_CallsClosureWithFreshParent verifies that
+// rebuildSnapshotUpdates invokes the rebuild closure and passes the fresh
+// branch head as freshParent.
+func TestRebuildSnapshotUpdates_CallsClosureWithFreshParent(t *testing.T) {
+       const oldManifest = "s3://bucket/old-manifest-list.avro"
+       const newManifest = "s3://bucket/new-manifest-list.avro"
+
+       // Build a fresh metadata that has a different snapshot than the one
+       // embedded in the update, so the parent-hasn't-changed guard is
+       // bypassed and the closure must be called.
+       freshHead := int64(42)
+       freshMeta := newConflictTestMetadata(t, &freshHead)
+
+       parentID := int64(7) // original snap's parent — does NOT match 
freshHead (42)
+       snap := &Snapshot{
+               SnapshotID:       99,
+               ParentSnapshotID: &parentID,
+               ManifestList:     oldManifest,
+               Summary:          &Summary{Operation: OpAppend},
+       }
+
+       var receivedParent *Snapshot
+       upd := rebuildUpdate(snap, newManifest, &receivedParent)
+
+       rebuilt, orphaned, err := rebuildSnapshotUpdates(
+               t.Context(),
+               []Update{upd},
+               freshMeta,
+               MainBranch,
+               iceio.LocalFS{},
+               1,
+       )
+       require.NoError(t, err)
+       require.Len(t, rebuilt, 1)
+       require.Len(t, orphaned, 1, "old manifest list must be recorded as 
orphaned")
+
+       // The closure should have been called with the branch's current head.
+       require.NotNil(t, receivedParent)
+       assert.Equal(t, freshHead, receivedParent.SnapshotID)
+
+       // The rebuilt update must carry the new manifest list.
+       addUpd, ok := rebuilt[0].(*addSnapshotUpdate)
+       require.True(t, ok)
+       assert.Equal(t, newManifest, addUpd.Snapshot.ManifestList)
+
+       // The superseded manifest list becomes an orphan.
+       assert.Equal(t, oldManifest, orphaned[0])
+}
+
+// TestRebuildSnapshotUpdates_SkipsWhenParentUnchanged verifies that
+// rebuildSnapshotUpdates skips the rebuild when the update's snapshot
+// already has the fresh branch head as its parent (no-op retry).
+func TestRebuildSnapshotUpdates_SkipsWhenParentUnchanged(t *testing.T) {
+       const manifest = "s3://bucket/manifest-list.avro"
+
+       freshHead := int64(42)
+       freshMeta := newConflictTestMetadata(t, &freshHead)
+
+       // Parent already equals the fresh head — rebuild must be skipped.
+       snap := &Snapshot{
+               SnapshotID:       99,
+               ParentSnapshotID: &freshHead, // same as fresh head
+               ManifestList:     manifest,
+               Summary:          &Summary{Operation: OpAppend},
+       }
+
+       called := false
+       upd := &addSnapshotUpdate{
+               baseUpdate: baseUpdate{ActionName: UpdateAddSnapshot},
+               Snapshot:   snap,
+               rebuildManifestList: func(_ context.Context, _ Metadata, _ 
*Snapshot, _ iceio.WriteFileIO, _ int) (*Snapshot, error) {
+                       called = true
+
+                       return snap, nil
+               },
+       }
+
+       rebuilt, orphaned, err := rebuildSnapshotUpdates(
+               t.Context(),
+               []Update{upd},
+               freshMeta,
+               MainBranch,
+               iceio.LocalFS{},
+               1,
+       )
+       require.NoError(t, err)
+       assert.False(t, called, "rebuild closure must not be called when parent 
is already up-to-date")
+       assert.Empty(t, orphaned, "no orphans when rebuild is skipped")
+       assert.Same(t, upd, rebuilt[0].(*addSnapshotUpdate), "original update 
must pass through unchanged")
+}
+
+// TestRebuildSnapshotUpdates_PassesThroughNonRebuildUpdates verifies that
+// updates without a rebuildManifestList closure are returned unmodified.
+func TestRebuildSnapshotUpdates_PassesThroughNonRebuildUpdates(t *testing.T) {
+       plainUpd := NewAddSnapshotUpdate(&Snapshot{
+               SnapshotID:   1,
+               ManifestList: "s3://bucket/no-rebuild.avro",
+               Summary:      &Summary{Operation: OpAppend},
+       })
+
+       // freshMeta may be nil — the plain update must not be touched.
+       rebuilt, orphaned, err := rebuildSnapshotUpdates(
+               t.Context(),
+               []Update{plainUpd},
+               nil,
+               MainBranch,
+               iceio.LocalFS{},
+               0,
+       )
+       require.NoError(t, err)
+       assert.Empty(t, orphaned)
+       assert.Same(t, plainUpd, rebuilt[0].(*addSnapshotUpdate))
+}
+
+// TestRebuildSnapshotUpdates_PropagatesClosureError verifies that an error
+// returned by the rebuild closure surfaces as the function's return error.
+func TestRebuildSnapshotUpdates_PropagatesClosureError(t *testing.T) {
+       freshHead := int64(5)
+       freshMeta := newConflictTestMetadata(t, &freshHead)
+
+       parent := int64(1)
+       snap := &Snapshot{
+               SnapshotID:       10,
+               ParentSnapshotID: &parent,
+               ManifestList:     "s3://bucket/old.avro",
+               Summary:          &Summary{Operation: OpAppend},
+       }
+       wantErr := errors.New("simulated S3 write failure")
+       upd := &addSnapshotUpdate{
+               baseUpdate: baseUpdate{ActionName: UpdateAddSnapshot},
+               Snapshot:   snap,
+               rebuildManifestList: func(_ context.Context, _ Metadata, _ 
*Snapshot, _ iceio.WriteFileIO, _ int) (*Snapshot, error) {
+                       return nil, wantErr
+               },
+       }
+
+       _, _, err := rebuildSnapshotUpdates(t.Context(), []Update{upd}, 
freshMeta, MainBranch, iceio.LocalFS{}, 1)
+       assert.ErrorIs(t, err, wantErr)
+}
+
+// ---------------------------------------------------------------------------
+// Fix 3 — newSeq derived from freshMeta.LastSequenceNumber()
+// ---------------------------------------------------------------------------
+
+// TestRebuildFn_SeqNumDerivedFromFreshMeta verifies that the rebuilt 
snapshot's
+// SequenceNumber equals freshMeta.LastSequenceNumber()+1, NOT
+// freshParent.SequenceNumber+1. A concurrent writer on a different branch can
+// advance the table-wide last-sequence-number without advancing this branch's
+// parent, so using freshParent.SequenceNumber+1 would violate the spec 
invariant.
+func TestRebuildFn_SeqNumDerivedFromFreshMeta(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       txn, wfs := createTestTransactionWithMemIO(t, spec)
+
+       sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
+       sp.appendDataFile(newTestDataFile(t, spec, 
"mem://default/table-location/data/f.parquet", nil))
+
+       updates, _, err := sp.commit(context.Background())
+       require.NoError(t, err)
+       addSnap := updates[0].(*addSnapshotUpdate)
+       require.NotNil(t, addSnap.rebuildManifestList, "rebuildManifestList 
closure must be set")
+
+       // Simulate a concurrent writer on another branch that bumped the global
+       // last-sequence-number to 99 without advancing this branch's parent.
+       // old code: newSeq = capturedSnapshot.SequenceNumber (stale, ≤ 99 — 
spec violation)
+       // new code: newSeq = freshMeta.LastSequenceNumber() + 1 = 100
+       freshMeta := newMetadataWithLastSeqNum(t, 99)
+       require.Equal(t, int64(99), freshMeta.LastSequenceNumber())
+
+       rebuilt, err := addSnap.rebuildManifestList(context.Background(), 
freshMeta, nil, wfs, 1)

Review Comment:
   `freshParent=nil` here, so the closure takes the nil branch and the actual 
fix site — `freshParent != nil` switching from `freshParent.SequenceNumber+1` 
to `freshMeta.LastSequenceNumber()+1` — has zero coverage. The 
peer-branch-advances-the-table-wide-seqnum scenario from the commit message 
never gets exercised.
   
   `TestRebuildFn_SummaryRebasedAgainstFreshParent` already wires both a 
non-nil freshParent and a freshMeta. Adding `require.Equal(t, 
freshMeta.LastSequenceNumber()+1, rebuilt.SequenceNumber)` there pins the 
freshParent branch without a new test.



##########
table/commit_retry_test.go:
##########
@@ -266,3 +307,244 @@ func TestReadRetryConfig_ClampsNegativeProperties(t 
*testing.T) {
        assert.Equal(t, uint(CommitMaxRetryWaitMsDefault), cfg.maxWaitMs)
        assert.Equal(t, uint(CommitTotalRetryTimeoutMsDefault), 
cfg.totalTimeoutMs)
 }
+
+// ---------------------------------------------------------------------------
+// Fix 5 — mandatory WriteFileIO check at top of doCommit
+// ---------------------------------------------------------------------------
+
+// TestDoCommit_NonWriteFileIOReturnsError verifies that doCommit fails
+// immediately when the table's file system does not implement WriteFileIO.
+// A silent skip would reuse the stale manifest list — exactly the bug
+// this PR was designed to fix.
+func TestDoCommit_NonWriteFileIOReturnsError(t *testing.T) {
+       cat := &flakyCatalog{}
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+       )
+       meta, err := NewMetadata(schema, iceberg.UnpartitionedSpec, 
UnsortedSortOrder, "file:///tmp/rotest",
+               iceberg.Properties{PropertyFormatVersion: "2"})
+       require.NoError(t, err)
+       cat.metadata = meta
+
+       tbl := New(
+               Identifier{"db", "ro-test"},
+               meta, "file:///tmp/rotest/v1.metadata.json",
+               func(context.Context) (iceio.IO, error) { return readOnlyIO{}, 
nil },
+               cat,
+       )
+
+       _, doErr := tbl.doCommit(t.Context(), nil, nil)
+       require.Error(t, doErr, "doCommit must fail when FS does not implement 
WriteFileIO")
+       assert.Contains(t, doErr.Error(), "WriteFileIO",
+               "error message must mention WriteFileIO so callers understand 
the requirement")
+       assert.Equal(t, int32(0), cat.attempts.Load(),
+               "CommitTable must not be called when FS check fails")
+}
+
+// ---------------------------------------------------------------------------
+// Fix 6 — orphan cleanup via defer
+// ---------------------------------------------------------------------------
+
+// newOCCTable creates a table that uses the given wfs for its FS and the given
+// catalog for commits. meta should include retry-config properties so that
+// doCommit's retry loop allows at least one retry.
+func newOCCTable(t *testing.T, meta Metadata, wfs iceio.WriteFileIO, cat 
CatalogIO) *Table {
+       t.Helper()
+
+       return New(
+               Identifier{"db", "occ-cleanup-test"},
+               meta,
+               "mem://default/table-location/metadata/v1.metadata.json",
+               func(context.Context) (iceio.IO, error) { return wfs, nil },
+               cat,
+       )
+}
+
+// newMemIOWithRetryMeta creates a test memIO and a matching table Metadata 
that
+// includes retry-config properties, so doCommit's retry loop allows retries.
+// The location matches createTestTransactionWithMemIO so they share the same
+// memIO for writing manifest files.
+func newMemIOWithRetryMeta(t *testing.T, spec iceberg.PartitionSpec) (*memIO, 
Metadata) {
+       t.Helper()
+       wfs := newMemIO(1<<20, nil)
+       schema := simpleSchema()
+       meta, err := NewMetadata(schema, &spec, UnsortedSortOrder, 
"mem://default/table-location",
+               iceberg.Properties{
+                       CommitNumRetriesKey:          "3",
+                       CommitMinRetryWaitMsKey:      "1",
+                       CommitMaxRetryWaitMsKey:      "2",
+                       CommitTotalRetryTimeoutMsKey: "60000",
+               })
+       require.NoError(t, err, "new metadata")
+
+       return wfs, meta
+}
+
+// TestDoCommit_OrphanCleanedOnSuccess verifies that manifest-list files
+// orphaned by OCC retries are removed after a successful commit. These files
+// are written during rebuild and must not leak on the happy path.
+func TestDoCommit_OrphanCleanedOnSuccess(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       wfs, meta := newMemIOWithRetryMeta(t, spec)
+
+       // Build a transaction from the retry-enabled meta and commit via the 
producer.
+       tbl := newOCCTable(t, meta, wfs, nil)
+       txn := tbl.NewTransaction()
+       sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
+       sp.appendDataFile(newTestDataFile(t, spec, 
"mem://default/table-location/data/f.parquet", nil))
+
+       updates, reqs, err := sp.commit(context.Background())
+       require.NoError(t, err)
+       addSnap := updates[0].(*addSnapshotUpdate)
+       originalManifestList := addSnap.Snapshot.ManifestList
+
+       // Pre-populate the original manifest list path in the IO so that the
+       // defer's wfs.Remove call has a concrete entry to delete.
+       wfs.files[originalManifestList] = []byte("placeholder")
+
+       // Catalog: fail once with ErrCommitFailed (triggers rebuild that 
orphans
+       // originalManifestList), then succeed.
+       cat := &sequentialCatalog{
+               metadata: meta,
+               errs:     []error{ErrCommitFailed},
+       }
+       tbl = newOCCTable(t, meta, wfs, cat)
+
+       _, err = tbl.doCommit(t.Context(), updates, reqs, 
withCommitBranch(MainBranch))
+       require.NoError(t, err, "doCommit must succeed on the second attempt")
+
+       _, stillExists := wfs.files[originalManifestList]
+       assert.False(t, stillExists,
+               "orphaned manifest list must be removed after successful 
commit")
+}
+
+// TestDoCommit_OrphanNotCleanedOnUnknownError verifies that manifest-list
+// files are NOT removed when CommitTable returns an unknown 
non-ErrCommitFailed
+// error (5xx / gateway timeout). In that case the catalog may have silently
+// accepted the commit, meaning one of the "orphaned" files is actually the
+// live snapshot. Deleting it would permanently corrupt the table.
+func TestDoCommit_OrphanNotCleanedOnUnknownError(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       wfs, meta := newMemIOWithRetryMeta(t, spec)
+
+       tbl := newOCCTable(t, meta, wfs, nil)
+       txn := tbl.NewTransaction()
+       sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
+       sp.appendDataFile(newTestDataFile(t, spec, 
"mem://default/table-location/data/f.parquet", nil))
+
+       updates, reqs, err := sp.commit(context.Background())
+       require.NoError(t, err)
+       addSnap := updates[0].(*addSnapshotUpdate)
+       originalManifestList := addSnap.Snapshot.ManifestList
+
+       wfs.files[originalManifestList] = []byte("placeholder")
+
+       unknown5xxErr := errors.New("simulated 5xx: internal server error")
+       // Catalog: fail once (ErrCommitFailed → rebuild → orphan created),
+       // then return a 5xx (non-ErrCommitFailed → cleanupOrphans=false).
+       cat := &sequentialCatalog{
+               metadata: meta,
+               errs:     []error{ErrCommitFailed, unknown5xxErr},
+       }
+       tbl = newOCCTable(t, meta, wfs, cat)
+
+       _, err = tbl.doCommit(t.Context(), updates, reqs, 
withCommitBranch(MainBranch))
+       require.ErrorIs(t, err, unknown5xxErr, "5xx error must propagate")
+
+       _, stillExists := wfs.files[originalManifestList]
+       assert.True(t, stillExists,
+               "orphaned manifest list must NOT be removed when commit outcome 
is unknown (5xx)")
+}
+
+// TestDoCommit_OrphanCleanedOnCommitDiverged verifies that manifest-list files
+// orphaned by rebuild attempts are removed when ErrCommitDiverged is returned
+// by a conflict validator. Diverged commits are terminal (no retry), and since
+// neither of the orphaned files was ever accepted by the catalog, they are 
safe
+// to delete. The defer cleanup runs with cleanupOrphans=true on this path.
+func TestDoCommit_OrphanCleanedOnCommitDiverged(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       wfs, meta := newMemIOWithRetryMeta(t, spec)
+
+       // freshMeta has a snapshot on MainBranch so validators run on the 
retry attempt.
+       freshID := int64(42)
+       freshMeta := newConflictTestMetadataWithProps(t, &freshID, 
iceberg.Properties{
+               CommitNumRetriesKey:          "3",
+               CommitMinRetryWaitMsKey:      "1",
+               CommitMaxRetryWaitMsKey:      "2",
+               CommitTotalRetryTimeoutMsKey: "60000",
+       })
+
+       tbl := newOCCTable(t, meta, wfs, nil)
+       txn := tbl.NewTransaction()
+       sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
+       sp.appendDataFile(newTestDataFile(t, spec, 
"mem://default/table-location/data/f.parquet", nil))
+
+       updates, reqs, err := sp.commit(context.Background())
+       require.NoError(t, err)
+       addSnap := updates[0].(*addSnapshotUpdate)
+       originalManifestList := addSnap.Snapshot.ManifestList
+
+       wfs.files[originalManifestList] = []byte("placeholder")
+
+       // Catalog: fail once (ErrCommitFailed → rebuild → orphan created).
+       // LoadTable returns freshMeta (has branch snapshot → validators run on 
retry).
+       // Validator returns ErrCommitDiverged immediately.
+       cat := &sequentialCatalog{
+               metadata: meta,
+               loadMeta: freshMeta,
+               errs:     []error{ErrCommitFailed},
+       }
+       tbl = newOCCTable(t, meta, wfs, cat)
+
+       divergedValidator := func(*conflictContext) error { return 
ErrCommitDiverged }
+
+       _, err = tbl.doCommit(t.Context(), updates, reqs,
+               withCommitBranch(MainBranch),
+               withCommitValidators(divergedValidator),
+       )
+       require.ErrorIs(t, err, ErrCommitDiverged)
+
+       // The defer must have fired with cleanupOrphans=true and removed the 
orphan.
+       _, stillExists := wfs.files[originalManifestList]
+       assert.False(t, stillExists,
+               "orphaned manifest list must be removed even on 
ErrCommitDiverged: "+
+                       "the file was never accepted by the catalog so it is 
safe to delete")
+}
+
+// TestDoCommit_OrphanCleanedOnRetriesExhausted verifies that when every retry
+// attempt fails with ErrCommitFailed and the loop exits with the budget
+// exhausted, the defer still fires with cleanupOrphans=true. None of the
+// orphaned manifest-list files were ever accepted by the catalog, so they are
+// safe to delete on this terminal exit.
+func TestDoCommit_OrphanCleanedOnRetriesExhausted(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       wfs, meta := newMemIOWithRetryMeta(t, spec)
+
+       tbl := newOCCTable(t, meta, wfs, nil)
+       txn := tbl.NewTransaction()
+       sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
+       sp.appendDataFile(newTestDataFile(t, spec, 
"mem://default/table-location/data/f.parquet", nil))
+
+       updates, reqs, err := sp.commit(context.Background())
+       require.NoError(t, err)
+       addSnap := updates[0].(*addSnapshotUpdate)
+       originalManifestList := addSnap.Snapshot.ManifestList
+
+       wfs.files[originalManifestList] = []byte("placeholder")
+
+       // numRetries=3 → 4 attempts; every attempt fails with ErrCommitFailed.
+       cat := &sequentialCatalog{
+               metadata: meta,
+               errs:     []error{ErrCommitFailed, ErrCommitFailed, 
ErrCommitFailed, ErrCommitFailed},

Review Comment:
   `sequentialCatalog.metadata` is the same object across all four LoadTable 
calls, so on each retry the closure sees a freshHead identical to base. None of 
the per-retry recompute (newSeq from `freshMeta.LastSequenceNumber()`, 
firstRowID from `freshMeta.NextRowID()`, summary rebased against freshParent) 
is actually exercised — this test pins terminal cleanup, not retry behavior.
   
   `headTrackingCatalog` in `commit_refresh_replay_test.go` already advances 
per attempt. One ≥2-retry test using it would close more coverage than these 
four orphan variants combined — worth a follow-up?



##########
table/commit_retry_test.go:
##########
@@ -266,3 +307,244 @@ func TestReadRetryConfig_ClampsNegativeProperties(t 
*testing.T) {
        assert.Equal(t, uint(CommitMaxRetryWaitMsDefault), cfg.maxWaitMs)
        assert.Equal(t, uint(CommitTotalRetryTimeoutMsDefault), 
cfg.totalTimeoutMs)
 }
+
+// ---------------------------------------------------------------------------
+// Fix 5 — mandatory WriteFileIO check at top of doCommit
+// ---------------------------------------------------------------------------
+
+// TestDoCommit_NonWriteFileIOReturnsError verifies that doCommit fails
+// immediately when the table's file system does not implement WriteFileIO.
+// A silent skip would reuse the stale manifest list — exactly the bug
+// this PR was designed to fix.
+func TestDoCommit_NonWriteFileIOReturnsError(t *testing.T) {
+       cat := &flakyCatalog{}
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+       )
+       meta, err := NewMetadata(schema, iceberg.UnpartitionedSpec, 
UnsortedSortOrder, "file:///tmp/rotest",
+               iceberg.Properties{PropertyFormatVersion: "2"})
+       require.NoError(t, err)
+       cat.metadata = meta
+
+       tbl := New(
+               Identifier{"db", "ro-test"},
+               meta, "file:///tmp/rotest/v1.metadata.json",
+               func(context.Context) (iceio.IO, error) { return readOnlyIO{}, 
nil },
+               cat,
+       )
+
+       _, doErr := tbl.doCommit(t.Context(), nil, nil)
+       require.Error(t, doErr, "doCommit must fail when FS does not implement 
WriteFileIO")
+       assert.Contains(t, doErr.Error(), "WriteFileIO",
+               "error message must mention WriteFileIO so callers understand 
the requirement")
+       assert.Equal(t, int32(0), cat.attempts.Load(),
+               "CommitTable must not be called when FS check fails")
+}
+
+// ---------------------------------------------------------------------------
+// Fix 6 — orphan cleanup via defer
+// ---------------------------------------------------------------------------
+
+// newOCCTable creates a table that uses the given wfs for its FS and the given
+// catalog for commits. meta should include retry-config properties so that
+// doCommit's retry loop allows at least one retry.
+func newOCCTable(t *testing.T, meta Metadata, wfs iceio.WriteFileIO, cat 
CatalogIO) *Table {
+       t.Helper()
+
+       return New(
+               Identifier{"db", "occ-cleanup-test"},
+               meta,
+               "mem://default/table-location/metadata/v1.metadata.json",
+               func(context.Context) (iceio.IO, error) { return wfs, nil },
+               cat,
+       )
+}
+
+// newMemIOWithRetryMeta creates a test memIO and a matching table Metadata 
that
+// includes retry-config properties, so doCommit's retry loop allows retries.
+// The location matches createTestTransactionWithMemIO so they share the same
+// memIO for writing manifest files.
+func newMemIOWithRetryMeta(t *testing.T, spec iceberg.PartitionSpec) (*memIO, 
Metadata) {
+       t.Helper()
+       wfs := newMemIO(1<<20, nil)
+       schema := simpleSchema()
+       meta, err := NewMetadata(schema, &spec, UnsortedSortOrder, 
"mem://default/table-location",
+               iceberg.Properties{
+                       CommitNumRetriesKey:          "3",
+                       CommitMinRetryWaitMsKey:      "1",
+                       CommitMaxRetryWaitMsKey:      "2",
+                       CommitTotalRetryTimeoutMsKey: "60000",
+               })
+       require.NoError(t, err, "new metadata")
+
+       return wfs, meta
+}
+
+// TestDoCommit_OrphanCleanedOnSuccess verifies that manifest-list files
+// orphaned by OCC retries are removed after a successful commit. These files
+// are written during rebuild and must not leak on the happy path.
+func TestDoCommit_OrphanCleanedOnSuccess(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       wfs, meta := newMemIOWithRetryMeta(t, spec)
+
+       // Build a transaction from the retry-enabled meta and commit via the 
producer.
+       tbl := newOCCTable(t, meta, wfs, nil)
+       txn := tbl.NewTransaction()
+       sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
+       sp.appendDataFile(newTestDataFile(t, spec, 
"mem://default/table-location/data/f.parquet", nil))
+
+       updates, reqs, err := sp.commit(context.Background())
+       require.NoError(t, err)
+       addSnap := updates[0].(*addSnapshotUpdate)
+       originalManifestList := addSnap.Snapshot.ManifestList
+
+       // Pre-populate the original manifest list path in the IO so that the
+       // defer's wfs.Remove call has a concrete entry to delete.
+       wfs.files[originalManifestList] = []byte("placeholder")
+
+       // Catalog: fail once with ErrCommitFailed (triggers rebuild that 
orphans
+       // originalManifestList), then succeed.
+       cat := &sequentialCatalog{
+               metadata: meta,
+               errs:     []error{ErrCommitFailed},
+       }
+       tbl = newOCCTable(t, meta, wfs, cat)
+
+       _, err = tbl.doCommit(t.Context(), updates, reqs, 
withCommitBranch(MainBranch))
+       require.NoError(t, err, "doCommit must succeed on the second attempt")
+
+       _, stillExists := wfs.files[originalManifestList]

Review Comment:
   `memIO.Create` returns a `limitedWriteCloser` that doesn't write back into 
`wfs.files`, so neither the original nor the rebuilt manifest list is actually 
present in the FS. The test only sees `originalManifestList` because of the 
manual placeholder write on line 403 — meaning a regression that flipped the 
cleanup to delete the *committed* path instead of the orphan would slip 
through, since the committed path also isn't there.
   
   Either persist `Create` writes into `wfs.files`, or add an assertion that 
`cat.metadata.CurrentSnapshot().ManifestList` differs from 
`originalManifestList` and was not removed. Same shape applies to the other 
three orphan-cleanup tests in this file.



##########
table/commit_retry_test.go:
##########
@@ -266,3 +307,244 @@ func TestReadRetryConfig_ClampsNegativeProperties(t 
*testing.T) {
        assert.Equal(t, uint(CommitMaxRetryWaitMsDefault), cfg.maxWaitMs)
        assert.Equal(t, uint(CommitTotalRetryTimeoutMsDefault), 
cfg.totalTimeoutMs)
 }
+
+// ---------------------------------------------------------------------------
+// Fix 5 — mandatory WriteFileIO check at top of doCommit
+// ---------------------------------------------------------------------------
+
+// TestDoCommit_NonWriteFileIOReturnsError verifies that doCommit fails
+// immediately when the table's file system does not implement WriteFileIO.
+// A silent skip would reuse the stale manifest list — exactly the bug
+// this PR was designed to fix.
+func TestDoCommit_NonWriteFileIOReturnsError(t *testing.T) {
+       cat := &flakyCatalog{}
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+       )
+       meta, err := NewMetadata(schema, iceberg.UnpartitionedSpec, 
UnsortedSortOrder, "file:///tmp/rotest",
+               iceberg.Properties{PropertyFormatVersion: "2"})
+       require.NoError(t, err)
+       cat.metadata = meta
+
+       tbl := New(
+               Identifier{"db", "ro-test"},
+               meta, "file:///tmp/rotest/v1.metadata.json",
+               func(context.Context) (iceio.IO, error) { return readOnlyIO{}, 
nil },
+               cat,
+       )
+
+       _, doErr := tbl.doCommit(t.Context(), nil, nil)
+       require.Error(t, doErr, "doCommit must fail when FS does not implement 
WriteFileIO")
+       assert.Contains(t, doErr.Error(), "WriteFileIO",

Review Comment:
   While here — substring match on `"WriteFileIO"` is brittle to any reword of 
the error. `TestComputeOwnManifests_SnapshotByIDError` and 
`_ParentManifestsIOError` use bare `require.Error` for the same shape. A 
sentinel + `errors.Is` would be the more durable form.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to