mzzz-zzm commented on issue #978:
URL: https://github.com/apache/iceberg-go/issues/978#issuecomment-4370030039
## Suggested fix
`RowDelta.validate()` is changed to collect the partition tuple of each
equality-delete file and pass those tuples to a new helper
`validateNoConflictingDataFilesInPartitions`. That helper only flags concurrent
data files whose partition tuple matches one of the equality-delete partitions,
rather than matching everything via `AlwaysTrue{}`. For unpartitioned tables
(or equality-delete files with empty partition tuples) the helper falls back to
the existing `AlwaysTrue` behaviour to remain safe.
This change touches **two functions in two files** and is independent of the
manifest list rebuild fix and the empty-table conflict bypass fix.
## Files changed
| File | Role |
|---|---|
| `table/row_delta.go` | Collects eq-delete partition tuples; calls new
helper instead of `AlwaysTrue` |
| `table/conflict_validation.go` | Adds
`validateNoConflictingDataFilesInPartitions` and `partitionTupleKey` helpers |
## Diff
### `table/row_delta.go`
```diff
-var hasEqDeletes bool
+var eqDeletePartitions []map[int]any
for _, f := range rd.delFiles {
switch f.ContentType() {
case iceberg.EntryContentPosDeletes:
if ref := f.ReferencedDataFile(); ref != nil && *ref != "" {
referenced = append(referenced, *ref)
}
case iceberg.EntryContentEqDeletes:
- hasEqDeletes = true
+ eqDeletePartitions = append(eqDeletePartitions, f.Partition())
}
}
for _, path := range referenced {
if _, found := cc.AddedDataFiles()[path]; found {
return ErrConflictedWithUncommittedDeleteFiles
}
}
-if hasEqDeletes {
+if len(eqDeletePartitions) > 0 {
level := readIsolationLevel(rd.txn.meta.props,
WriteDeleteIsolationLevelKey, WriteDeleteIsolationLevelDefault)
- // Conservative: eq-deletes apply by predicate, and RowDelta
- // does not yet surface the bound predicate. AlwaysTrue is the
- // safest over-approximation and matches PR 2.3's contract on
- // validateNoConflictingDataFiles under SERIALIZABLE. Follow-up:
- // narrow with the actual eq-delete filter once it is carried
- // on the RowDelta.
- if err := validateNoConflictingDataFiles(cc, iceberg.AlwaysTrue{},
level); err != nil {
+ // Use partition-aware conflict detection: only flag concurrent data
+ // files whose partition tuple matches one of the equality delete
+ // files' partition tuples.
+ if err := validateNoConflictingDataFilesInPartitions(cc,
eqDeletePartitions, level); err != nil {
return err
}
}
```
### `table/conflict_validation.go`
Two new functions added after `validateNoConflictingDataFiles`:
```go
// validateNoConflictingDataFilesInPartitions is like
// validateNoConflictingDataFiles but scoped to specific partition
// tuples derived from equality-delete files. It only flags concurrent
// data files whose partition tuple matches one of the provided tuples,
// avoiding false conflicts when a concurrent append lands in a
// completely different partition.
//
// When any provided partition tuple is empty (the table is
// unpartitioned or the delete file covers all partitions), the check
// falls back to AlwaysTrue — the equality delete could affect any row.
//
// Under IsolationSnapshot this validator is a no-op.
func validateNoConflictingDataFilesInPartitions(ctx *conflictContext,
eqDeletePartitions []map[int]any, level IsolationLevel) error {
if level != IsolationSerializable {
return nil
}
if len(ctx.concurrent) == 0 || len(eqDeletePartitions) == 0 {
return nil
}
// If any eq-delete file is unpartitioned (empty tuple), the delete
// could affect any row — fall back to the conservative AlwaysTrue
check.
for _, p := range eqDeletePartitions {
if len(p) == 0 {
return validateAddedDataFilesMatchingFilter(ctx,
iceberg.AlwaysTrue{})
}
}
// Build a set of partition tuple keys for O(1) lookup.
partSet := make(map[string]struct{}, len(eqDeletePartitions))
for _, p := range eqDeletePartitions {
partSet[partitionTupleKey(p)] = struct{}{}
}
for _, snap := range ctx.concurrent {
manifests, err := snap.Manifests(ctx.fs)
if err != nil {
return fmt.Errorf("loading manifests for concurrent
snapshot %d: %w", snap.SnapshotID, err)
}
for _, mf := range manifests {
if mf.ManifestContent() != iceberg.ManifestContentData {
continue
}
entries, err := mf.FetchEntries(ctx.fs, false)
if err != nil {
return fmt.Errorf("reading entries from
manifest %s: %w", mf.FilePath(), err)
}
for _, e := range entries {
if e.Status() != iceberg.EntryStatusADDED ||
e.SnapshotID() != snap.SnapshotID {
continue
}
if _, ok :=
partSet[partitionTupleKey(e.DataFile().Partition())]; ok {
return fmt.Errorf("%w: snapshot %d
added data file %s in eq-delete partition",
ErrConflictingDataFiles,
snap.SnapshotID, e.DataFile().FilePath())
}
}
}
}
return nil
}
// partitionTupleKey returns a deterministic string key for a partition
// tuple map. Keys are sorted by field id so maps with the same content
// always produce the same string.
func partitionTupleKey(p map[int]any) string {
if len(p) == 0 {
return ""
}
keys := make([]int, 0, len(p))
for k := range p {
keys = append(keys, k)
}
sort.Ints(keys)
var buf []byte
for _, k := range keys {
buf = fmt.Appendf(buf, "%d=%v;", k, p[k])
}
return string(buf)
}
```
## Behaviour matrix
| eq-delete partition | concurrent data partition | isolation | result |
|---|---|---|---|
| `hot` | `hot` | SERIALIZABLE | `ErrConflictingDataFiles` (same partition —
real conflict) |
| `hot` | `cold` | SERIALIZABLE | no error (different partition — no
conflict) |
| `hot` | `hot` | SNAPSHOT | no error (snapshot isolation skips the check) |
| unpartitioned | any | SERIALIZABLE | `ErrConflictingDataFiles` (AlwaysTrue
fallback) |
## Test
Run the reproducer from the bug report:
```
go test ./table/ -run TestBugRepro_RowDeltaFalseConflictDifferentPartition -v
```
Expected after this fix:
```
--- PASS: TestBugRepro_RowDeltaFalseConflictDifferentPartition (0.11s)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]