mzzz-zzm commented on code in PR #983:
URL: https://github.com/apache/iceberg-go/pull/983#discussion_r3195690281


##########
table/conflict_validation.go:
##########
@@ -426,6 +427,159 @@ func validateNoConflictingDataFiles(ctx *conflictContext, 
filter iceberg.Boolean
        return validateAddedDataFilesMatchingFilter(ctx, filter)
 }
 
+// validateNoConflictingDataFilesInPartitions is like
+// validateNoConflictingDataFiles but scoped to the partitions touched
+// by equality-delete files. It builds an OR-of-equalities filter from
+// the provided partition tuples and delegates to
+// validateAddedDataFilesMatchingFilter, which performs per-spec
+// projection, manifest-summary pruning, and type-aware evaluation via
+// iceberg.Literal — making it safe for UUID, decimal, binary, fixed,
+// and future partition types, and correct across partition-spec
+// evolution because each concurrent manifest is projected against its
+// own spec ID.
+//
+// Callers are responsible for ensuring the table is partitioned
+// (i.e. at least one partition field exists) before calling this
+// function. For unpartitioned tables, call
+// validateNoConflictingDataFiles(ctx, iceberg.AlwaysTrue{}, level)
+// directly.
+//
+// Under IsolationSnapshot this validator is a no-op.
+func validateNoConflictingDataFilesInPartitions(ctx *conflictContext, 
eqDeleteFiles []iceberg.DataFile, level IsolationLevel) error {
+       if level != IsolationSerializable {
+               return nil
+       }
+
+       if len(ctx.concurrent) == 0 || len(eqDeleteFiles) == 0 {
+               return nil
+       }
+
+       filter, err := eqDeletePartitionsToFilter(eqDeleteFiles, ctx.current)
+       if err != nil {
+               return fmt.Errorf("building partition conflict filter: %w", err)
+       }
+
+       return validateNoConflictingDataFiles(ctx, filter, level)
+}
+
+// eqDeletePartitionsToFilter converts equality-delete data files into an
+// OR-of-ANDs BooleanExpression in row (source) space, suitable for passing
+// to validateAddedDataFilesMatchingFilter.
+//
+// For each eq-delete file it resolves each partition field ID to the source
+// schema field name via the file's partition spec, then builds an EqualTo
+// predicate using Reference(sourceFieldName). Multiple fields within one
+// partition are AND-ed; multiple eq-delete files are OR-ed.
+//
+// The resulting expression is projected per-concurrent-manifest's spec ID
+// inside validateAddedDataFilesMatchingFilter (via buildPartitionProjection),
+// ensuring correct conflict detection even after partition-spec evolution.
+//
+// An empty partition tuple (unpartitioned delete) returns AlwaysTrue so the
+// caller falls back to the conservative full-table scan. Callers should
+// normally guard against calling this function for unpartitioned tables (see
+// RowDelta.validate).
+func eqDeletePartitionsToFilter(files []iceberg.DataFile, meta Metadata) 
(iceberg.BooleanExpression, error) {
+       terms := make([]iceberg.BooleanExpression, 0, len(files))
+       for _, f := range files {
+               p := f.Partition()
+               if len(p) == 0 {
+                       return iceberg.AlwaysTrue{}, nil
+               }
+
+               spec := meta.PartitionSpecByID(int(f.SpecID()))
+               if spec == nil {
+                       return nil, fmt.Errorf("partition spec ID %d not found 
in metadata", f.SpecID())
+               }
+
+               // Build partition field ID → PartitionField lookup for this 
spec.
+               partFieldByID := make(map[int]iceberg.PartitionField, 
spec.NumFields())
+               for _, pf := range spec.Fields() {
+                       partFieldByID[pf.FieldID] = pf
+               }
+
+               // Sort partition field IDs for deterministic expression order.
+               fieldIDs := make([]int, 0, len(p))
+               for id := range p {
+                       fieldIDs = append(fieldIDs, id)
+               }
+               sort.Ints(fieldIDs)
+
+               conjuncts := make([]iceberg.BooleanExpression, 0, len(p))
+               for _, partFieldID := range fieldIDs {
+                       pf, ok := partFieldByID[partFieldID]
+                       if !ok {
+                               return nil, fmt.Errorf("partition field ID %d 
not found in spec %d", partFieldID, f.SpecID())
+                       }
+
+                       // Resolve to source schema field to obtain the 
Reference name.
+                       sourceField, ok := 
meta.CurrentSchema().FindFieldByID(pf.SourceID())
+                       if !ok {
+                               return nil, fmt.Errorf("source field ID %d 
(partition field %q) not found in schema", pf.SourceID(), pf.Name)
+                       }
+
+                       lit, err := anyToLiteral(p[partFieldID])
+                       if err != nil {
+                               return nil, fmt.Errorf("partition field %q: 
%w", sourceField.Name, err)
+                       }
+
+                       conjuncts = append(conjuncts, 
iceberg.LiteralPredicate(iceberg.OpEQ, iceberg.Reference(sourceField.Name), 
lit))

Review Comment:
   Fixed. `eqDeletePartitionsToFilter` now detects non-identity transforms per 
partition field before building any row-space predicate. If any field in a 
file's partition spec uses a non-identity transform (bucket, day, hour, 
truncate, year, month), the function falls back to `AlwaysTrue{}` for that file 
— treating the eq-delete as table-wide, which is conservative but always 
correct. `IdentityTransform` continues to produce the scoped row-space 
predicate as before.
   
   The full `PartitionSet`-style fix (partition-space predicate keyed by 
`(specID, partitionFieldName)` evaluated only against same-spec concurrent 
manifests) is deferred to a follow-up PR.
   
   Two regression tests lock the contract:
   - `TestRowDeltaValidate_BucketTransformFallsBackToAlwaysTrue`: 
`bucket[16](user_id)` partitioned table — concurrent data in bucket 1, 
eq-delete in bucket 1 → rejected.
   - `TestRowDeltaValidate_DayTransformFallsBackToAlwaysTrue`: `day(event_ts)` 
partitioned table — concurrent data in day 100, eq-delete in day 200 → still 
rejected (conservative; AlwaysTrue cannot distinguish days).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to