mzzz-zzm commented on issue #978:
URL: https://github.com/apache/iceberg-go/issues/978#issuecomment-4370030039

   ## Suggested fix
   
   `RowDelta.validate()` is changed to collect the partition tuple of each 
equality-delete file and pass those tuples to a new helper 
`validateNoConflictingDataFilesInPartitions`. That helper only flags concurrent 
data files whose partition tuple matches one of the equality-delete partitions, 
rather than matching everything via `AlwaysTrue{}`. For unpartitioned tables 
(or equality-delete files with empty partition tuples) the helper falls back to 
the existing `AlwaysTrue` behaviour to remain safe.
   
   This change touches **two functions in two files** and is independent of the 
manifest list rebuild fix and the empty-table conflict bypass fix.
   
   ## Files changed
   
   | File | Role |
   |---|---|
   | `table/row_delta.go` | Collects eq-delete partition tuples; calls new 
helper instead of `AlwaysTrue` |
   | `table/conflict_validation.go` | Adds 
`validateNoConflictingDataFilesInPartitions` and `partitionTupleKey` helpers |
   
   ## Diff
   
   ### `table/row_delta.go`
   
   ```diff
   -var hasEqDeletes bool
   +var eqDeletePartitions []map[int]any
    for _, f := range rd.delFiles {
        switch f.ContentType() {
        case iceberg.EntryContentPosDeletes:
                if ref := f.ReferencedDataFile(); ref != nil && *ref != "" {
                        referenced = append(referenced, *ref)
                }
        case iceberg.EntryContentEqDeletes:
   -            hasEqDeletes = true
   +            eqDeletePartitions = append(eqDeletePartitions, f.Partition())
        }
    }
   
    for _, path := range referenced {
        if _, found := cc.AddedDataFiles()[path]; found {
                return ErrConflictedWithUncommittedDeleteFiles
        }
    }
   
   -if hasEqDeletes {
   +if len(eqDeletePartitions) > 0 {
        level := readIsolationLevel(rd.txn.meta.props,
                WriteDeleteIsolationLevelKey, WriteDeleteIsolationLevelDefault)
   -    // Conservative: eq-deletes apply by predicate, and RowDelta
   -    // does not yet surface the bound predicate. AlwaysTrue is the
   -    // safest over-approximation and matches PR 2.3's contract on
   -    // validateNoConflictingDataFiles under SERIALIZABLE. Follow-up:
   -    // narrow with the actual eq-delete filter once it is carried
   -    // on the RowDelta.
   -    if err := validateNoConflictingDataFiles(cc, iceberg.AlwaysTrue{}, 
level); err != nil {
   +    // Use partition-aware conflict detection: only flag concurrent data
   +    // files whose partition tuple matches one of the equality delete
   +    // files' partition tuples.
   +    if err := validateNoConflictingDataFilesInPartitions(cc, 
eqDeletePartitions, level); err != nil {
                return err
        }
    }
   ```
   
   ### `table/conflict_validation.go`
   
   Two new functions added after `validateNoConflictingDataFiles`:
   
   ```go
   // validateNoConflictingDataFilesInPartitions is like
   // validateNoConflictingDataFiles but scoped to specific partition
   // tuples derived from equality-delete files. It only flags concurrent
   // data files whose partition tuple matches one of the provided tuples,
   // avoiding false conflicts when a concurrent append lands in a
   // completely different partition.
   //
   // When any provided partition tuple is empty (the table is
   // unpartitioned or the delete file covers all partitions), the check
   // falls back to AlwaysTrue — the equality delete could affect any row.
   //
   // Under IsolationSnapshot this validator is a no-op.
   func validateNoConflictingDataFilesInPartitions(ctx *conflictContext, 
eqDeletePartitions []map[int]any, level IsolationLevel) error {
        if level != IsolationSerializable {
                return nil
        }
        if len(ctx.concurrent) == 0 || len(eqDeletePartitions) == 0 {
                return nil
        }
   
        // If any eq-delete file is unpartitioned (empty tuple), the delete
        // could affect any row — fall back to the conservative AlwaysTrue 
check.
        for _, p := range eqDeletePartitions {
                if len(p) == 0 {
                        return validateAddedDataFilesMatchingFilter(ctx, 
iceberg.AlwaysTrue{})
                }
        }
   
        // Build a set of partition tuple keys for O(1) lookup.
        partSet := make(map[string]struct{}, len(eqDeletePartitions))
        for _, p := range eqDeletePartitions {
                partSet[partitionTupleKey(p)] = struct{}{}
        }
   
        for _, snap := range ctx.concurrent {
                manifests, err := snap.Manifests(ctx.fs)
                if err != nil {
                        return fmt.Errorf("loading manifests for concurrent 
snapshot %d: %w", snap.SnapshotID, err)
                }
                for _, mf := range manifests {
                        if mf.ManifestContent() != iceberg.ManifestContentData {
                                continue
                        }
                        entries, err := mf.FetchEntries(ctx.fs, false)
                        if err != nil {
                                return fmt.Errorf("reading entries from 
manifest %s: %w", mf.FilePath(), err)
                        }
                        for _, e := range entries {
                                if e.Status() != iceberg.EntryStatusADDED || 
e.SnapshotID() != snap.SnapshotID {
                                        continue
                                }
                                if _, ok := 
partSet[partitionTupleKey(e.DataFile().Partition())]; ok {
                                        return fmt.Errorf("%w: snapshot %d 
added data file %s in eq-delete partition",
                                                ErrConflictingDataFiles, 
snap.SnapshotID, e.DataFile().FilePath())
                                }
                        }
                }
        }
        return nil
   }
   
   // partitionTupleKey returns a deterministic string key for a partition
   // tuple map. Keys are sorted by field id so maps with the same content
   // always produce the same string.
   func partitionTupleKey(p map[int]any) string {
        if len(p) == 0 {
                return ""
        }
        keys := make([]int, 0, len(p))
        for k := range p {
                keys = append(keys, k)
        }
        sort.Ints(keys)
        var buf []byte
        for _, k := range keys {
                buf = fmt.Appendf(buf, "%d=%v;", k, p[k])
        }
        return string(buf)
   }
   ```
   
   ## Behaviour matrix
   
   | eq-delete partition | concurrent data partition | isolation | result |
   |---|---|---|---|
   | `hot` | `hot` | SERIALIZABLE | `ErrConflictingDataFiles` (same partition — 
real conflict) |
   | `hot` | `cold` | SERIALIZABLE | no error (different partition — no 
conflict) |
   | `hot` | `hot` | SNAPSHOT | no error (snapshot isolation skips the check) |
   | unpartitioned | any | SERIALIZABLE | `ErrConflictingDataFiles` (AlwaysTrue 
fallback) |
   
   ## Test
   
   Run the reproducer from the bug report:
   
   ```
   go test ./table/ -run TestBugRepro_RowDeltaFalseConflictDifferentPartition -v
   ```
   
   Expected after this fix:
   
   ```
   --- PASS: TestBugRepro_RowDeltaFalseConflictDifferentPartition (0.11s)
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to