mzzz-zzm commented on issue #976:
URL: https://github.com/apache/iceberg-go/issues/976#issuecomment-4369918376

   ## Suggested fix
   
   With the patch below, on each OCC retry, `doCommit` now calls a per-snapshot 
rebuild closure that rewrites the manifest list Avro file to inherit all data 
files committed by concurrent writers since the original snapshot was built. 
The retrying writer's own newly-written manifest files are combined with the 
fresh parent's inherited manifests, producing a correct manifest list for the 
retry attempt. Orphaned manifest list files from superseded retry attempts are 
deleted after the successful commit.
   
   ## Files changed
   
   | File | Role |
   |---|---|
   | `table/snapshot_producers.go` | Computes "own" manifests; attaches rebuild 
closure to `addSnapshotUpdate` |
   | `table/updates.go` | Adds `ownManifests` and `rebuildManifestList` fields 
to `addSnapshotUpdate`; propagates them through `Apply` |
   | `table/table.go` | Calls `rebuildSnapshotUpdates` on each retry; cleans up 
orphaned manifest list files after success |
   
   ## Diff
   
   ### `table/updates.go`
   
   ```diff
    type addSnapshotUpdate struct {
        baseUpdate
        Snapshot *Snapshot `json:"snapshot"`
   +
   +    // ownManifests holds the manifests written by this producer (those
   +    // NOT inherited from the parent snapshot). Populated by
   +    // snapshotProducer.commit and used by rebuildManifestList below.
   +    ownManifests []iceberg.ManifestFile
   +
   +    // rebuildManifestList, when non-nil, regenerates the snapshot's
   +    // manifest list to inherit from freshParent and combines it with
   +    // ownManifests. Called by doCommit on every retry attempt so that
   +    // each retry snapshot correctly inherits all files committed by
   +    // concurrent writers since the original build.
   +    rebuildManifestList func(ctx context.Context, freshParent *Snapshot, 
fio io.WriteFileIO, attempt int) (*Snapshot, error)
    }
   
    func NewAddSnapshotUpdate(snapshot *Snapshot) *addSnapshotUpdate {
   ```
   
   ```diff
    func (u *addSnapshotUpdate) Apply(builder *MetadataBuilder) error {
   -    return builder.AddSnapshot(u.Snapshot)
   +    if err := builder.AddSnapshot(u.Snapshot); err != nil {
   +            return err
   +    }
   +
   +    // Propagate the rebuild closure to the newly-added update object so
   +    // that doCommit's retry loop can regenerate the manifest list after
   +    // an OCC conflict. MetadataBuilder.AddSnapshot always appends a fresh
   +    // *addSnapshotUpdate as the last element of builder.updates; we reach
   +    // back and copy our runtime-only fields onto it.
   +    if u.rebuildManifestList != nil {
   +            if n := len(builder.updates); n > 0 {
   +                    if su, ok := builder.updates[n-1].(*addSnapshotUpdate); 
ok {
   +                            su.ownManifests = u.ownManifests
   +                            su.rebuildManifestList = u.rebuildManifestList
   +                    }
   +            }
   +    }
   +
   +    return nil
    }
   ```
   
   ### `table/snapshot_producers.go`
   
   New helper that separates the producer's own manifests from those inherited 
from the stale parent:
   
   ```diff
   +// computeOwnManifests returns the subset of allManifests that were written
   +// by this producer (i.e. not inherited from the parent snapshot).
   +func (sp *snapshotProducer) computeOwnManifests(allManifests 
[]iceberg.ManifestFile) []iceberg.ManifestFile {
   +    if sp.parentSnapshotID <= 0 {
   +            return allManifests
   +    }
   +
   +    parent, err := sp.txn.meta.SnapshotByID(sp.parentSnapshotID)
   +    if err != nil || parent == nil {
   +            return allManifests
   +    }
   +
   +    parentManifests, err := parent.Manifests(sp.io)
   +    if err != nil {
   +            return allManifests
   +    }
   +
   +    inherited := make(map[string]bool, len(parentManifests))
   +    for _, m := range parentManifests {
   +            inherited[m.FilePath()] = true
   +    }
   +
   +    own := make([]iceberg.ManifestFile, 0, len(allManifests))
   +    for _, m := range allManifests {
   +            if !inherited[m.FilePath()] {
   +                    own = append(own, m)
   +            }
   +    }
   +    return own
   +}
   ```
   
   In `commit()`, two hunks are changed: the variable declaration just after 
the `newManifests` error check, and the return statement at the end of the 
function.
   
   **Hunk 1** — add `ownManifests` variable:
   
   ```diff
    func (sp *snapshotProducer) commit(ctx context.Context) (_ []Update, _ 
[]Requirement, err error) {
        newManifests, err := sp.manifests(ctx)
        if err != nil {
                return nil, nil, err
        }
   
   +    // Separate "own" manifests (those written by this producer) from
   +    // manifests inherited from the stale parent. The own manifests are
   +    // preserved when the manifest list is rebuilt during OCC retries.
   +    ownManifests := sp.computeOwnManifests(newManifests)
   +
        nextSequence := sp.txn.meta.nextSequenceNumber()
   ```
   
   **Hunk 2** — add the rebuild closure and change the return statement:
   
   ```diff
        }
   
   +    // Build the manifest-list rebuild closure. It is called by doCommit
   +    // on each OCC retry to regenerate the manifest list so it correctly
   +    // inherits all data files committed by concurrent writers since the
   +    // original snapshot was built.
   +    formatVersion := sp.txn.meta.formatVersion
   +    snapshotID := sp.snapshotID
   +    commitUUID := sp.commitUuid
   +    capturedSnapshot := snapshot
   +    processManifestsFn := func(m []iceberg.ManifestFile) 
([]iceberg.ManifestFile, error) {
   +            return sp.processManifests(m)
   +    }
   +
   +    rebuildFn := func(_ context.Context, freshParent *Snapshot, fio 
iceio.WriteFileIO, attempt int) (_ *Snapshot, retErr error) {
   +            var inherited []iceberg.ManifestFile
   +            if freshParent != nil {
   +                    inherited, retErr = freshParent.Manifests(fio)
   +                    if retErr != nil {
   +                            return nil, fmt.Errorf("rebuild manifest list: 
load parent manifests: %w", retErr)
   +                    }
   +            }
   +
   +            combined, procErr := 
processManifestsFn(slices.Concat(ownManifests, inherited))
   +            if procErr != nil {
   +                    return nil, fmt.Errorf("rebuild manifest list: process 
manifests: %w", procErr)
   +            }
   +
   +            var newSeq int64
   +            if freshParent != nil && formatVersion >= 2 {
   +                    newSeq = freshParent.SequenceNumber + 1
   +            } else {
   +                    newSeq = capturedSnapshot.SequenceNumber
   +            }
   +
   +            fname := newManifestListFileName(snapshotID, attempt, 
commitUUID)
   +            manifestListPath := locProvider.NewMetadataLocation(fname)
   +
   +            out, createErr := fio.Create(manifestListPath)
   +            if createErr != nil {
   +                    return nil, fmt.Errorf("rebuild manifest list: create 
file: %w", createErr)
   +            }
   +            defer internal.CheckedClose(out, &retErr)
   +
   +            var parentID *int64
   +            if freshParent != nil {
   +                    id := freshParent.SnapshotID
   +                    parentID = &id
   +            }
   +
   +            firstRowID := int64(0)
   +            if formatVersion == 3 {
   +                    writer, wrErr := iceberg.NewManifestListWriterV3(out, 
snapshotID, newSeq, firstRowID, parentID)
   +                    if wrErr != nil {
   +                            return nil, fmt.Errorf("rebuild manifest list: 
create v3 writer: %w", wrErr)
   +                    }
   +                    defer internal.CheckedClose(writer, &retErr)
   +                    if addErr := writer.AddManifests(combined); addErr != 
nil {
   +                            return nil, fmt.Errorf("rebuild manifest list: 
add manifests: %w", addErr)
   +                    }
   +            } else {
   +                    if wErr := iceberg.WriteManifestList(formatVersion, 
out, snapshotID, parentID, &newSeq, firstRowID, combined); wErr != nil {
   +                            return nil, fmt.Errorf("rebuild manifest list: 
write: %w", wErr)
   +                    }
   +            }
   +
   +            rebuilt := capturedSnapshot
   +            rebuilt.ManifestList = manifestListPath
   +            rebuilt.ParentSnapshotID = parentID
   +            rebuilt.SequenceNumber = newSeq
   +            return &rebuilt, nil
   +    }
   +
   +    addSnap := NewAddSnapshotUpdate(&snapshot)
   +    addSnap.ownManifests = ownManifests
   +    addSnap.rebuildManifestList = rebuildFn
   +
        return []Update{
   -            NewAddSnapshotUpdate(&snapshot),
   -            NewSetSnapshotRefUpdate(branch, sp.snapshotID, BranchRef, -1, 
-1, -1),
   +            addSnap,
   +            // Use 0 (not -1) for the optional fields so they are omitted by
   +            // `omitempty` in JSON marshalling. -1 is a sentinel meaning
   +            // "no limit" internally, but strict catalogs such as AWS S3 
Tables
   +            // reject a payload that explicitly contains negative values.
   +            NewSetSnapshotRefUpdate(branch, sp.snapshotID, BranchRef, 0, 0, 
0),
        }, []Requirement{
                AssertRefSnapshotID(branch, sp.txn.meta.currentSnapshotID),
        }, nil
   ```
   
   ### `table/table.go`
   
   `doCommit` calls the rebuild on each retry and cleans up orphaned files after
   the successful commit:
   
   ```diff
   +    var orphanedManifests []string
   
        for attempt := range totalAttempts {
                if attempt != 0 {
                        current = fresh.metadata
                        reqs = rewriteRefSnapshotRequirements(reqs, co.branch, 
current)
   +
   +                    if wfs, ok := fs.(icebergio.WriteFileIO); ok {
   +                            rebuiltUpdates, orphaned, rebuildErr := 
rebuildSnapshotUpdates(retryCtx, updates, current, co.branch, wfs, int(attempt))
   +                            if rebuildErr != nil {
   +                                    return nil, fmt.Errorf("rebuild 
manifest list for retry attempt %d: %w", attempt, rebuildErr)
   +                            }
   +                            orphanedManifests = append(orphanedManifests, 
orphaned...)
   +                            updates = rebuiltUpdates
   +                    }
                }
        }
   
   +    // Remove orphaned manifest list files written by superseded retry 
attempts.
   +    if len(orphanedManifests) > 0 {
   +            if wfs, ok := fs.(icebergio.WriteFileIO); ok {
   +                    for _, path := range orphanedManifests {
   +                            if removeErr := wfs.Remove(path); removeErr != 
nil {
   +                                    log.Printf("Warning: failed to delete 
orphaned manifest list %s: %v", path, removeErr)
   +                            }
   +                    }
   +            }
   +    }
   ```
   
   New helper `rebuildSnapshotUpdates` (full implementation in 
`table/table.go`):
   
   ```go
   // rebuildSnapshotUpdates returns a new slice of updates where any
   // addSnapshotUpdate that carries a rebuildManifestList closure has its
   // snapshot regenerated to inherit all data files committed to the branch
   // since the original snapshot was built.
   func rebuildSnapshotUpdates(ctx context.Context, updates []Update, freshMeta 
Metadata, branch string, fs icebergio.WriteFileIO, attempt int) (rebuilt 
[]Update, orphanedPaths []string, err error) {
        var freshHead *Snapshot
        if branch != "" && freshMeta != nil {
                freshHead = freshMeta.SnapshotByName(branch)
        } else if freshMeta != nil {
                freshHead = freshMeta.CurrentSnapshot()
        }
   
        result := make([]Update, len(updates))
        copy(result, updates)
   
        for i, u := range result {
                su, ok := u.(*addSnapshotUpdate)
                if !ok || su.rebuildManifestList == nil {
                        continue
                }
                // Skip if the parent has not changed — saves an unnecessary 
write.
                if freshHead != nil && su.Snapshot.ParentSnapshotID != nil &&
                        *su.Snapshot.ParentSnapshotID == freshHead.SnapshotID {
                        continue
                }
   
                oldManifestList := su.Snapshot.ManifestList
                newSnap, rebuildErr := su.rebuildManifestList(ctx, freshHead, 
fs, attempt)
                if rebuildErr != nil {
                        return nil, nil, rebuildErr
                }
                result[i] = &addSnapshotUpdate{
                        baseUpdate:          su.baseUpdate,
                        Snapshot:            newSnap,
                        ownManifests:        su.ownManifests,
                        rebuildManifestList: su.rebuildManifestList,
                }
                orphanedPaths = append(orphanedPaths, oldManifestList)
        }
        return result, orphanedPaths, nil
   }
   ```
   
   ## Test
   
   Run the reproducer from the bug report:
   
   ```
   go test ./table/ -run TestBugRepro_ManifestListNotInheritedOnRetry -v
   ```
   
   Expected after this fix:
   
   ```
   --- PASS: TestBugRepro_ManifestListNotInheritedOnRetry (0.11s)
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to