mzzz-zzm commented on code in PR #983:
URL: https://github.com/apache/iceberg-go/pull/983#discussion_r3193629955


##########
table/conflict_validation.go:
##########
@@ -426,6 +426,89 @@ func validateNoConflictingDataFiles(ctx *conflictContext, 
filter iceberg.Boolean
        return validateAddedDataFilesMatchingFilter(ctx, filter)
 }
 
+// validateNoConflictingDataFilesInPartitions is like
+// validateNoConflictingDataFiles but scoped to specific partition
+// tuples derived from equality-delete files. It only flags concurrent
+// data files whose partition tuple matches one of the provided tuples,
+// avoiding false conflicts when a concurrent append lands in a
+// completely different partition.
+//
+// When any provided partition tuple is empty (the table is
+// unpartitioned or the delete file covers all partitions), the check
+// falls back to AlwaysTrue — the equality delete could affect any row.
+//
+// Under IsolationSnapshot this validator is a no-op.
+func validateNoConflictingDataFilesInPartitions(ctx *conflictContext, 
eqDeletePartitions []map[int]any, level IsolationLevel) error {
+       if level != IsolationSerializable {
+               return nil
+       }
+
+       if len(ctx.concurrent) == 0 || len(eqDeletePartitions) == 0 {
+               return nil
+       }
+
+       // If any eq-delete file is unpartitioned (empty tuple), the delete
+       // could affect any row — fall back to the conservative AlwaysTrue 
check.
+       for _, p := range eqDeletePartitions {
+               if len(p) == 0 {
+                       return validateAddedDataFilesMatchingFilter(ctx, 
iceberg.AlwaysTrue{})
+               }
+       }
+
+       // Build a set of partition tuple keys for O(1) lookup.
+       partSet := make(map[string]struct{}, len(eqDeletePartitions))
+       for _, p := range eqDeletePartitions {
+               partSet[partitionTupleKey(p)] = struct{}{}
+       }
+
+       for _, snap := range ctx.concurrent {
+               manifests, err := snap.Manifests(ctx.fs)
+               if err != nil {
+                       return fmt.Errorf("loading manifests for concurrent 
snapshot %d: %w", snap.SnapshotID, err)
+               }
+               for _, mf := range manifests {
+                       if mf.ManifestContent() != iceberg.ManifestContentData {
+                               continue
+                       }
+                       entries, err := mf.FetchEntries(ctx.fs, false)
+                       if err != nil {
+                               return fmt.Errorf("reading entries from 
manifest %s: %w", mf.FilePath(), err)
+                       }
+                       for _, e := range entries {
+                               if e.Status() != iceberg.EntryStatusADDED || 
e.SnapshotID() != snap.SnapshotID {
+                                       continue
+                               }
+                               if _, ok := 
partSet[partitionTupleKey(e.DataFile().Partition())]; ok {

Review Comment:
   Resolved automatically by routing through 
`validateAddedDataFilesMatchingFilter`. That helper calls 
`buildPartitionProjection(specID, ...)` keyed per each concurrent manifest's 
`PartitionSpecID`, so a concurrent file written under spec B is projected 
against spec B's fields — not spec A's. The eq-delete filter is expressed in 
row (source) space using `Reference(sourceFieldName)`, so it projects correctly 
regardless of which spec the concurrent file was written under.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to