laskoviymishka commented on code in PR #982:
URL: https://github.com/apache/iceberg-go/pull/982#discussion_r3202392999


##########
table/commit_retry_test.go:
##########
@@ -266,3 +307,244 @@ func TestReadRetryConfig_ClampsNegativeProperties(t 
*testing.T) {
        assert.Equal(t, uint(CommitMaxRetryWaitMsDefault), cfg.maxWaitMs)
        assert.Equal(t, uint(CommitTotalRetryTimeoutMsDefault), 
cfg.totalTimeoutMs)
 }
+
+// ---------------------------------------------------------------------------
+// Fix 5 — mandatory WriteFileIO check at top of doCommit
+// ---------------------------------------------------------------------------
+
+// TestDoCommit_NonWriteFileIOReturnsError verifies that doCommit fails
+// immediately when the table's file system does not implement WriteFileIO.
+// A silent skip would reuse the stale manifest list — exactly the bug
+// this PR was designed to fix.
+func TestDoCommit_NonWriteFileIOReturnsError(t *testing.T) {
+       cat := &flakyCatalog{}
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+       )
+       meta, err := NewMetadata(schema, iceberg.UnpartitionedSpec, 
UnsortedSortOrder, "file:///tmp/rotest",
+               iceberg.Properties{PropertyFormatVersion: "2"})
+       require.NoError(t, err)
+       cat.metadata = meta
+
+       tbl := New(
+               Identifier{"db", "ro-test"},
+               meta, "file:///tmp/rotest/v1.metadata.json",
+               func(context.Context) (iceio.IO, error) { return readOnlyIO{}, 
nil },
+               cat,
+       )
+
+       _, doErr := tbl.doCommit(t.Context(), nil, nil)
+       require.Error(t, doErr, "doCommit must fail when FS does not implement 
WriteFileIO")
+       assert.Contains(t, doErr.Error(), "WriteFileIO",
+               "error message must mention WriteFileIO so callers understand 
the requirement")
+       assert.Equal(t, int32(0), cat.attempts.Load(),
+               "CommitTable must not be called when FS check fails")
+}
+
+// ---------------------------------------------------------------------------
+// Fix 6 — orphan cleanup via defer
+// ---------------------------------------------------------------------------
+
+// newOCCTable creates a table that uses the given wfs for its FS and the given
+// catalog for commits. meta should include retry-config properties so that
+// doCommit's retry loop allows at least one retry.
+func newOCCTable(t *testing.T, meta Metadata, wfs iceio.WriteFileIO, cat 
CatalogIO) *Table {
+       t.Helper()
+
+       return New(
+               Identifier{"db", "occ-cleanup-test"},
+               meta,
+               "mem://default/table-location/metadata/v1.metadata.json",
+               func(context.Context) (iceio.IO, error) { return wfs, nil },
+               cat,
+       )
+}
+
+// newMemIOWithRetryMeta creates a test memIO and a matching table Metadata 
that
+// includes retry-config properties, so doCommit's retry loop allows retries.
+// The location matches createTestTransactionWithMemIO so they share the same
+// memIO for writing manifest files.
+func newMemIOWithRetryMeta(t *testing.T, spec iceberg.PartitionSpec) (*memIO, 
Metadata) {
+       t.Helper()
+       wfs := newMemIO(1<<20, nil)
+       schema := simpleSchema()
+       meta, err := NewMetadata(schema, &spec, UnsortedSortOrder, 
"mem://default/table-location",
+               iceberg.Properties{
+                       CommitNumRetriesKey:          "3",
+                       CommitMinRetryWaitMsKey:      "1",
+                       CommitMaxRetryWaitMsKey:      "2",
+                       CommitTotalRetryTimeoutMsKey: "60000",
+               })
+       require.NoError(t, err, "new metadata")
+
+       return wfs, meta
+}
+
+// TestDoCommit_OrphanCleanedOnSuccess verifies that manifest-list files
+// orphaned by OCC retries are removed after a successful commit. These files
+// are written during rebuild and must not leak on the happy path.
+func TestDoCommit_OrphanCleanedOnSuccess(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       wfs, meta := newMemIOWithRetryMeta(t, spec)
+
+       // Build a transaction from the retry-enabled meta and commit via the 
producer.
+       tbl := newOCCTable(t, meta, wfs, nil)
+       txn := tbl.NewTransaction()
+       sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
+       sp.appendDataFile(newTestDataFile(t, spec, 
"mem://default/table-location/data/f.parquet", nil))
+
+       updates, reqs, err := sp.commit(context.Background())
+       require.NoError(t, err)
+       addSnap := updates[0].(*addSnapshotUpdate)
+       originalManifestList := addSnap.Snapshot.ManifestList
+
+       // Pre-populate the original manifest list path in the IO so that the
+       // defer's wfs.Remove call has a concrete entry to delete.
+       wfs.files[originalManifestList] = []byte("placeholder")
+
+       // Catalog: fail once with ErrCommitFailed (triggers rebuild that 
orphans
+       // originalManifestList), then succeed.
+       cat := &sequentialCatalog{
+               metadata: meta,
+               errs:     []error{ErrCommitFailed},
+       }
+       tbl = newOCCTable(t, meta, wfs, cat)
+
+       _, err = tbl.doCommit(t.Context(), updates, reqs, 
withCommitBranch(MainBranch))
+       require.NoError(t, err, "doCommit must succeed on the second attempt")
+
+       _, stillExists := wfs.files[originalManifestList]
+       assert.False(t, stillExists,
+               "orphaned manifest list must be removed after successful 
commit")
+}
+
+// TestDoCommit_OrphanNotCleanedOnUnknownError verifies that manifest-list
+// files are NOT removed when CommitTable returns an unknown 
non-ErrCommitFailed
+// error (5xx / gateway timeout). In that case the catalog may have silently
+// accepted the commit, meaning one of the "orphaned" files is actually the
+// live snapshot. Deleting it would permanently corrupt the table.
+func TestDoCommit_OrphanNotCleanedOnUnknownError(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       wfs, meta := newMemIOWithRetryMeta(t, spec)
+
+       tbl := newOCCTable(t, meta, wfs, nil)
+       txn := tbl.NewTransaction()
+       sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
+       sp.appendDataFile(newTestDataFile(t, spec, 
"mem://default/table-location/data/f.parquet", nil))
+
+       updates, reqs, err := sp.commit(context.Background())
+       require.NoError(t, err)
+       addSnap := updates[0].(*addSnapshotUpdate)
+       originalManifestList := addSnap.Snapshot.ManifestList
+
+       wfs.files[originalManifestList] = []byte("placeholder")
+
+       unknown5xxErr := errors.New("simulated 5xx: internal server error")
+       // Catalog: fail once (ErrCommitFailed → rebuild → orphan created),
+       // then return a 5xx (non-ErrCommitFailed → cleanupOrphans=false).
+       cat := &sequentialCatalog{
+               metadata: meta,
+               errs:     []error{ErrCommitFailed, unknown5xxErr},
+       }
+       tbl = newOCCTable(t, meta, wfs, cat)
+
+       _, err = tbl.doCommit(t.Context(), updates, reqs, 
withCommitBranch(MainBranch))
+       require.ErrorIs(t, err, unknown5xxErr, "5xx error must propagate")
+
+       _, stillExists := wfs.files[originalManifestList]
+       assert.True(t, stillExists,
+               "orphaned manifest list must NOT be removed when commit outcome 
is unknown (5xx)")
+}
+
+// TestDoCommit_OrphanCleanedOnCommitDiverged verifies that manifest-list files
+// orphaned by rebuild attempts are removed when ErrCommitDiverged is returned
+// by a conflict validator. Diverged commits are terminal (no retry), and since
+// neither of the orphaned files was ever accepted by the catalog, they are 
safe
+// to delete. The defer cleanup runs with cleanupOrphans=true on this path.
+func TestDoCommit_OrphanCleanedOnCommitDiverged(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       wfs, meta := newMemIOWithRetryMeta(t, spec)
+
+       // freshMeta has a snapshot on MainBranch so validators run on the 
retry attempt.
+       freshID := int64(42)
+       freshMeta := newConflictTestMetadataWithProps(t, &freshID, 
iceberg.Properties{
+               CommitNumRetriesKey:          "3",
+               CommitMinRetryWaitMsKey:      "1",
+               CommitMaxRetryWaitMsKey:      "2",
+               CommitTotalRetryTimeoutMsKey: "60000",
+       })
+
+       tbl := newOCCTable(t, meta, wfs, nil)
+       txn := tbl.NewTransaction()
+       sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
+       sp.appendDataFile(newTestDataFile(t, spec, 
"mem://default/table-location/data/f.parquet", nil))
+
+       updates, reqs, err := sp.commit(context.Background())
+       require.NoError(t, err)
+       addSnap := updates[0].(*addSnapshotUpdate)
+       originalManifestList := addSnap.Snapshot.ManifestList
+
+       wfs.files[originalManifestList] = []byte("placeholder")
+
+       // Catalog: fail once (ErrCommitFailed → rebuild → orphan created).
+       // LoadTable returns freshMeta (has branch snapshot → validators run on 
retry).
+       // Validator returns ErrCommitDiverged immediately.
+       cat := &sequentialCatalog{
+               metadata: meta,
+               loadMeta: freshMeta,
+               errs:     []error{ErrCommitFailed},
+       }
+       tbl = newOCCTable(t, meta, wfs, cat)
+
+       divergedValidator := func(*conflictContext) error { return 
ErrCommitDiverged }
+
+       _, err = tbl.doCommit(t.Context(), updates, reqs,
+               withCommitBranch(MainBranch),
+               withCommitValidators(divergedValidator),
+       )
+       require.ErrorIs(t, err, ErrCommitDiverged)
+
+       // The defer must have fired with cleanupOrphans=true and removed the 
orphan.
+       _, stillExists := wfs.files[originalManifestList]
+       assert.False(t, stillExists,
+               "orphaned manifest list must be removed even on 
ErrCommitDiverged: "+
+                       "the file was never accepted by the catalog so it is 
safe to delete")
+}
+
+// TestDoCommit_OrphanCleanedOnRetriesExhausted verifies that when every retry
+// attempt fails with ErrCommitFailed and the loop exits with the budget
+// exhausted, the defer still fires with cleanupOrphans=true. None of the
+// orphaned manifest-list files were ever accepted by the catalog, so they are
+// safe to delete on this terminal exit.
+func TestDoCommit_OrphanCleanedOnRetriesExhausted(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       wfs, meta := newMemIOWithRetryMeta(t, spec)
+
+       tbl := newOCCTable(t, meta, wfs, nil)
+       txn := tbl.NewTransaction()
+       sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
+       sp.appendDataFile(newTestDataFile(t, spec, 
"mem://default/table-location/data/f.parquet", nil))
+
+       updates, reqs, err := sp.commit(context.Background())
+       require.NoError(t, err)
+       addSnap := updates[0].(*addSnapshotUpdate)
+       originalManifestList := addSnap.Snapshot.ManifestList
+
+       wfs.files[originalManifestList] = []byte("placeholder")
+
+       // numRetries=3 → 4 attempts; every attempt fails with ErrCommitFailed.
+       cat := &sequentialCatalog{
+               metadata: meta,
+               errs:     []error{ErrCommitFailed, ErrCommitFailed, 
ErrCommitFailed, ErrCommitFailed},

Review Comment:
   [Non-blocking] None of the `TestDoCommit_Orphan*` tests progress `freshMeta` 
between retries — `sequentialCatalog.metadata` is the same object on every 
`LoadTable` call, so on each retry the rebuild closure sees a `freshHead` that 
equals the original parent (or is nil for an empty table). The 
skip-if-parent-unchanged predicate could short-circuit, and even when it 
doesn't, the closure runs in degenerate mode where every per-retry recompute 
(newSeq, firstRowID, summary) reproduces the captured-attempt-0 values. So this 
test exercises the orphan-cleanup state machine, but not the actual *retry* 
semantics the fixes target. Worth one end-to-end test that drives ≥2 
ErrCommitFailed retries with a `freshMeta` that progresses between attempts 
(the existing `headTrackingCatalog` in `commit_refresh_replay_test.go` is most 
of the way there). That single test would close more coverage than the four 
orphan-cleanup variants combined — every fix in this PR is exercised at once: 
orphan list
  grows by N, each rebuild sees a different `LastSequenceNumber`/`NextRowID`, 
summary chains on top of an evolving parent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to