laskoviymishka commented on code in PR #983:
URL: https://github.com/apache/iceberg-go/pull/983#discussion_r3194249336


##########
table/conflict_validation.go:
##########
@@ -426,6 +427,159 @@ func validateNoConflictingDataFiles(ctx *conflictContext, 
filter iceberg.Boolean
        return validateAddedDataFilesMatchingFilter(ctx, filter)
 }
 
+// validateNoConflictingDataFilesInPartitions is like
+// validateNoConflictingDataFiles but scoped to the partitions touched
+// by equality-delete files. It builds an OR-of-equalities filter from
+// the provided partition tuples and delegates to
+// validateAddedDataFilesMatchingFilter, which performs per-spec
+// projection, manifest-summary pruning, and type-aware evaluation via
+// iceberg.Literal — making it safe for UUID, decimal, binary, fixed,
+// and future partition types, and correct across partition-spec
+// evolution because each concurrent manifest is projected against its
+// own spec ID.
+//
+// Callers are responsible for ensuring the table is partitioned
+// (i.e. at least one partition field exists) before calling this
+// function. For unpartitioned tables, call
+// validateNoConflictingDataFiles(ctx, iceberg.AlwaysTrue{}, level)
+// directly.
+//
+// Under IsolationSnapshot this validator is a no-op.
+func validateNoConflictingDataFilesInPartitions(ctx *conflictContext, 
eqDeleteFiles []iceberg.DataFile, level IsolationLevel) error {
+       if level != IsolationSerializable {
+               return nil
+       }
+
+       if len(ctx.concurrent) == 0 || len(eqDeleteFiles) == 0 {
+               return nil
+       }
+
+       filter, err := eqDeletePartitionsToFilter(eqDeleteFiles, ctx.current)
+       if err != nil {
+               return fmt.Errorf("building partition conflict filter: %w", err)
+       }
+
+       return validateNoConflictingDataFiles(ctx, filter, level)
+}
+
+// eqDeletePartitionsToFilter converts equality-delete data files into an
+// OR-of-ANDs BooleanExpression in row (source) space, suitable for passing
+// to validateAddedDataFilesMatchingFilter.
+//
+// For each eq-delete file it resolves each partition field ID to the source
+// schema field name via the file's partition spec, then builds an EqualTo
+// predicate using Reference(sourceFieldName). Multiple fields within one
+// partition are AND-ed; multiple eq-delete files are OR-ed.
+//
+// The resulting expression is projected per-concurrent-manifest's spec ID
+// inside validateAddedDataFilesMatchingFilter (via buildPartitionProjection),
+// ensuring correct conflict detection even after partition-spec evolution.
+//
+// An empty partition tuple (unpartitioned delete) returns AlwaysTrue so the
+// caller falls back to the conservative full-table scan. Callers should
+// normally guard against calling this function for unpartitioned tables (see
+// RowDelta.validate).
+func eqDeletePartitionsToFilter(files []iceberg.DataFile, meta Metadata) 
(iceberg.BooleanExpression, error) {
+       terms := make([]iceberg.BooleanExpression, 0, len(files))
+       for _, f := range files {
+               p := f.Partition()
+               if len(p) == 0 {
+                       return iceberg.AlwaysTrue{}, nil
+               }
+
+               spec := meta.PartitionSpecByID(int(f.SpecID()))
+               if spec == nil {
+                       return nil, fmt.Errorf("partition spec ID %d not found 
in metadata", f.SpecID())
+               }
+
+               // Build partition field ID → PartitionField lookup for this 
spec.
+               partFieldByID := make(map[int]iceberg.PartitionField, 
spec.NumFields())
+               for _, pf := range spec.Fields() {
+                       partFieldByID[pf.FieldID] = pf
+               }
+
+               // Sort partition field IDs for deterministic expression order.
+               fieldIDs := make([]int, 0, len(p))
+               for id := range p {
+                       fieldIDs = append(fieldIDs, id)
+               }
+               sort.Ints(fieldIDs)
+
+               conjuncts := make([]iceberg.BooleanExpression, 0, len(p))
+               for _, partFieldID := range fieldIDs {
+                       pf, ok := partFieldByID[partFieldID]
+                       if !ok {
+                               return nil, fmt.Errorf("partition field ID %d 
not found in spec %d", partFieldID, f.SpecID())
+                       }
+
+                       // Resolve to source schema field to obtain the 
Reference name.
+                       sourceField, ok := 
meta.CurrentSchema().FindFieldByID(pf.SourceID())
+                       if !ok {
+                               return nil, fmt.Errorf("source field ID %d 
(partition field %q) not found in schema", pf.SourceID(), pf.Name)
+                       }
+
+                       lit, err := anyToLiteral(p[partFieldID])
+                       if err != nil {
+                               return nil, fmt.Errorf("partition field %q: 
%w", sourceField.Name, err)
+                       }
+
+                       conjuncts = append(conjuncts, 
iceberg.LiteralPredicate(iceberg.OpEQ, iceberg.Reference(sourceField.Name), 
lit))

Review Comment:
   I think there's a subtle issue here for non-identity transforms — wanted to 
flag it because the test suite only exercises `IdentityTransform` so it might 
not surface naturally.
   
   The literal we pass in (`lit`) is the post-transform partition value (a 
bucket index from `BucketTransform`, days-from-epoch from `DayTransform`, 
etc.), but the predicate is anchored against the *source* column via 
`Reference(sourceField.Name)`. Downstream, 
`validateAddedDataFilesMatchingFilter` calls `BucketTransform.Project` 
(`transforms.go:366`), which does `transformLiteral(transformer, p.Literal())` 
— re-bucketing what's already a bucket index.
   
   Quick repro on `BucketTransform{NumBuckets: 16}` over `int64`:
   ```
   user_id=12345 → bucket=1   (what DataFile.Partition() stores)
   user_id=1     → bucket=4   (what BucketTransform.Project produces)
   ```
   Eq-delete is in bucket 1, projected predicate matches bucket 4 — a 
concurrent file in bucket 1 silently passes. Same shape for `day(ts)`, 
`hour(ts)`, `truncate[K]`, `year`/`month`. `IdentityTransform` happens to work 
because `identity(identity(x)) == x`, which is why the existing spec-evolution 
test is green.
   
   A couple of ways to handle, wdyt?
   * branch on `pf.Transform`: identity → keep this row-space predicate, 
non-identity → build a partition-space predicate (`Reference(pf.Name) == lit` 
keyed by `(specID, pf.Name)`) evaluated only against same-spec concurrent 
manifests, with cross-spec as a conservative fallback for that file — close to 
Java's `PartitionSet`.
   * narrower scope: explicitly reject non-identity transforms here with a 
clear error and document this as identity-only for now.
   
   A couple of regression tests under `bucket[N]` and `day(ts)` would lock the 
contract either way.



##########
table/conflict_validation.go:
##########
@@ -426,6 +427,159 @@ func validateNoConflictingDataFiles(ctx *conflictContext, 
filter iceberg.Boolean
        return validateAddedDataFilesMatchingFilter(ctx, filter)
 }
 
+// validateNoConflictingDataFilesInPartitions is like
+// validateNoConflictingDataFiles but scoped to the partitions touched
+// by equality-delete files. It builds an OR-of-equalities filter from
+// the provided partition tuples and delegates to
+// validateAddedDataFilesMatchingFilter, which performs per-spec
+// projection, manifest-summary pruning, and type-aware evaluation via
+// iceberg.Literal — making it safe for UUID, decimal, binary, fixed,
+// and future partition types, and correct across partition-spec
+// evolution because each concurrent manifest is projected against its
+// own spec ID.
+//
+// Callers are responsible for ensuring the table is partitioned
+// (i.e. at least one partition field exists) before calling this
+// function. For unpartitioned tables, call
+// validateNoConflictingDataFiles(ctx, iceberg.AlwaysTrue{}, level)
+// directly.
+//
+// Under IsolationSnapshot this validator is a no-op.
+func validateNoConflictingDataFilesInPartitions(ctx *conflictContext, 
eqDeleteFiles []iceberg.DataFile, level IsolationLevel) error {
+       if level != IsolationSerializable {
+               return nil
+       }
+
+       if len(ctx.concurrent) == 0 || len(eqDeleteFiles) == 0 {
+               return nil
+       }
+
+       filter, err := eqDeletePartitionsToFilter(eqDeleteFiles, ctx.current)
+       if err != nil {
+               return fmt.Errorf("building partition conflict filter: %w", err)
+       }
+
+       return validateNoConflictingDataFiles(ctx, filter, level)
+}
+
+// eqDeletePartitionsToFilter converts equality-delete data files into an
+// OR-of-ANDs BooleanExpression in row (source) space, suitable for passing
+// to validateAddedDataFilesMatchingFilter.
+//
+// For each eq-delete file it resolves each partition field ID to the source
+// schema field name via the file's partition spec, then builds an EqualTo
+// predicate using Reference(sourceFieldName). Multiple fields within one
+// partition are AND-ed; multiple eq-delete files are OR-ed.
+//
+// The resulting expression is projected per-concurrent-manifest's spec ID
+// inside validateAddedDataFilesMatchingFilter (via buildPartitionProjection),
+// ensuring correct conflict detection even after partition-spec evolution.
+//
+// An empty partition tuple (unpartitioned delete) returns AlwaysTrue so the
+// caller falls back to the conservative full-table scan. Callers should
+// normally guard against calling this function for unpartitioned tables (see
+// RowDelta.validate).
+func eqDeletePartitionsToFilter(files []iceberg.DataFile, meta Metadata) 
(iceberg.BooleanExpression, error) {
+       terms := make([]iceberg.BooleanExpression, 0, len(files))
+       for _, f := range files {
+               p := f.Partition()
+               if len(p) == 0 {
+                       return iceberg.AlwaysTrue{}, nil
+               }
+
+               spec := meta.PartitionSpecByID(int(f.SpecID()))
+               if spec == nil {
+                       return nil, fmt.Errorf("partition spec ID %d not found 
in metadata", f.SpecID())
+               }
+
+               // Build partition field ID → PartitionField lookup for this 
spec.
+               partFieldByID := make(map[int]iceberg.PartitionField, 
spec.NumFields())
+               for _, pf := range spec.Fields() {
+                       partFieldByID[pf.FieldID] = pf
+               }
+
+               // Sort partition field IDs for deterministic expression order.
+               fieldIDs := make([]int, 0, len(p))
+               for id := range p {
+                       fieldIDs = append(fieldIDs, id)
+               }
+               sort.Ints(fieldIDs)
+
+               conjuncts := make([]iceberg.BooleanExpression, 0, len(p))
+               for _, partFieldID := range fieldIDs {
+                       pf, ok := partFieldByID[partFieldID]
+                       if !ok {
+                               return nil, fmt.Errorf("partition field ID %d 
not found in spec %d", partFieldID, f.SpecID())
+                       }
+
+                       // Resolve to source schema field to obtain the 
Reference name.
+                       sourceField, ok := 
meta.CurrentSchema().FindFieldByID(pf.SourceID())
+                       if !ok {
+                               return nil, fmt.Errorf("source field ID %d 
(partition field %q) not found in schema", pf.SourceID(), pf.Name)
+                       }
+
+                       lit, err := anyToLiteral(p[partFieldID])
+                       if err != nil {
+                               return nil, fmt.Errorf("partition field %q: 
%w", sourceField.Name, err)
+                       }
+
+                       conjuncts = append(conjuncts, 
iceberg.LiteralPredicate(iceberg.OpEQ, iceberg.Reference(sourceField.Name), 
lit))
+               }
+
+               if len(conjuncts) == 1 {
+                       terms = append(terms, conjuncts[0])
+               } else {
+                       terms = append(terms, iceberg.NewAnd(conjuncts[0], 
conjuncts[1], conjuncts[2:]...))
+               }
+       }
+
+       if len(terms) == 0 {
+               return iceberg.AlwaysTrue{}, nil
+       }
+
+       if len(terms) == 1 {
+               return terms[0], nil
+       }
+
+       return iceberg.NewOr(terms[0], terms[1], terms[2:]...), nil
+}
+
+// anyToLiteral converts a dynamically-typed partition value (as
+// stored in iceberg.DataFile.Partition()) to an iceberg.Literal.
+// The supported types mirror the iceberg.LiteralType constraint.
+func anyToLiteral(v any) (iceberg.Literal, error) {
+       switch val := v.(type) {
+       case bool:
+               return iceberg.NewLiteral(val), nil
+       case int32:
+               return iceberg.NewLiteral(val), nil
+       case int64:
+               return iceberg.NewLiteral(val), nil
+       case float32:
+               return iceberg.NewLiteral(val), nil
+       case float64:
+               return iceberg.NewLiteral(val), nil
+       case string:
+               return iceberg.NewLiteral(val), nil
+       case []byte:
+               return iceberg.NewLiteral(val), nil
+       case iceberg.Date:
+               return iceberg.NewLiteral(val), nil
+       case iceberg.Time:
+               return iceberg.NewLiteral(val), nil
+       case iceberg.Timestamp:
+               return iceberg.NewLiteral(val), nil
+       case iceberg.TimestampNano:
+               return iceberg.NewLiteral(val), nil
+       case iceberg.Decimal:

Review Comment:
   Tiny type-asymmetry I noticed while tracing the round-trip:
   
   `convertAvroValueToIcebergType` at `manifest.go:1800` returns 
`iceberg.DecimalLiteral{Scale, Val}`, which is `type DecimalLiteral Decimal` — 
a *named type derived from* `Decimal`, not `Decimal` itself. Go type switches 
match exact dynamic type, so this arm doesn't fire on a round-tripped value — 
it falls through to `default:` with `"unsupported partition value type 
DecimalLiteral"`.
   
   Since `AddDeletes` takes `iceberg.DataFile`, any flow that re-uses a 
manifest-read file as input would hit this on decimal-partitioned tables. Two 
options I can think of:
   * add `case iceberg.DecimalLiteral:` here
   * normalize `convertAvroValueToIcebergType` to return `Decimal` consistently 
with the writer side
   
   Fwiw `TestAnyToLiteral_SupportedTypes` currently skips Decimal — adding a 
round-trip case once this is settled would catch any future drift.



##########
table/conflict_validation.go:
##########
@@ -426,6 +427,159 @@ func validateNoConflictingDataFiles(ctx *conflictContext, 
filter iceberg.Boolean
        return validateAddedDataFilesMatchingFilter(ctx, filter)
 }
 
+// validateNoConflictingDataFilesInPartitions is like
+// validateNoConflictingDataFiles but scoped to the partitions touched
+// by equality-delete files. It builds an OR-of-equalities filter from
+// the provided partition tuples and delegates to
+// validateAddedDataFilesMatchingFilter, which performs per-spec
+// projection, manifest-summary pruning, and type-aware evaluation via
+// iceberg.Literal — making it safe for UUID, decimal, binary, fixed,
+// and future partition types, and correct across partition-spec
+// evolution because each concurrent manifest is projected against its
+// own spec ID.
+//
+// Callers are responsible for ensuring the table is partitioned
+// (i.e. at least one partition field exists) before calling this
+// function. For unpartitioned tables, call
+// validateNoConflictingDataFiles(ctx, iceberg.AlwaysTrue{}, level)
+// directly.
+//
+// Under IsolationSnapshot this validator is a no-op.
+func validateNoConflictingDataFilesInPartitions(ctx *conflictContext, 
eqDeleteFiles []iceberg.DataFile, level IsolationLevel) error {
+       if level != IsolationSerializable {
+               return nil
+       }
+
+       if len(ctx.concurrent) == 0 || len(eqDeleteFiles) == 0 {
+               return nil
+       }
+
+       filter, err := eqDeletePartitionsToFilter(eqDeleteFiles, ctx.current)
+       if err != nil {
+               return fmt.Errorf("building partition conflict filter: %w", err)
+       }
+
+       return validateNoConflictingDataFiles(ctx, filter, level)
+}
+
+// eqDeletePartitionsToFilter converts equality-delete data files into an
+// OR-of-ANDs BooleanExpression in row (source) space, suitable for passing
+// to validateAddedDataFilesMatchingFilter.
+//
+// For each eq-delete file it resolves each partition field ID to the source
+// schema field name via the file's partition spec, then builds an EqualTo
+// predicate using Reference(sourceFieldName). Multiple fields within one
+// partition are AND-ed; multiple eq-delete files are OR-ed.
+//
+// The resulting expression is projected per-concurrent-manifest's spec ID
+// inside validateAddedDataFilesMatchingFilter (via buildPartitionProjection),
+// ensuring correct conflict detection even after partition-spec evolution.
+//
+// An empty partition tuple (unpartitioned delete) returns AlwaysTrue so the
+// caller falls back to the conservative full-table scan. Callers should
+// normally guard against calling this function for unpartitioned tables (see
+// RowDelta.validate).
+func eqDeletePartitionsToFilter(files []iceberg.DataFile, meta Metadata) 
(iceberg.BooleanExpression, error) {
+       terms := make([]iceberg.BooleanExpression, 0, len(files))
+       for _, f := range files {
+               p := f.Partition()
+               if len(p) == 0 {
+                       return iceberg.AlwaysTrue{}, nil
+               }
+
+               spec := meta.PartitionSpecByID(int(f.SpecID()))
+               if spec == nil {
+                       return nil, fmt.Errorf("partition spec ID %d not found 
in metadata", f.SpecID())
+               }
+
+               // Build partition field ID → PartitionField lookup for this 
spec.
+               partFieldByID := make(map[int]iceberg.PartitionField, 
spec.NumFields())
+               for _, pf := range spec.Fields() {
+                       partFieldByID[pf.FieldID] = pf
+               }
+
+               // Sort partition field IDs for deterministic expression order.
+               fieldIDs := make([]int, 0, len(p))
+               for id := range p {
+                       fieldIDs = append(fieldIDs, id)
+               }
+               sort.Ints(fieldIDs)
+
+               conjuncts := make([]iceberg.BooleanExpression, 0, len(p))
+               for _, partFieldID := range fieldIDs {
+                       pf, ok := partFieldByID[partFieldID]
+                       if !ok {
+                               return nil, fmt.Errorf("partition field ID %d 
not found in spec %d", partFieldID, f.SpecID())
+                       }
+
+                       // Resolve to source schema field to obtain the 
Reference name.
+                       sourceField, ok := 
meta.CurrentSchema().FindFieldByID(pf.SourceID())
+                       if !ok {
+                               return nil, fmt.Errorf("source field ID %d 
(partition field %q) not found in schema", pf.SourceID(), pf.Name)
+                       }
+
+                       lit, err := anyToLiteral(p[partFieldID])
+                       if err != nil {
+                               return nil, fmt.Errorf("partition field %q: 
%w", sourceField.Name, err)
+                       }
+
+                       conjuncts = append(conjuncts, 
iceberg.LiteralPredicate(iceberg.OpEQ, iceberg.Reference(sourceField.Name), 
lit))
+               }
+
+               if len(conjuncts) == 1 {
+                       terms = append(terms, conjuncts[0])
+               } else {
+                       terms = append(terms, iceberg.NewAnd(conjuncts[0], 
conjuncts[1], conjuncts[2:]...))
+               }
+       }
+
+       if len(terms) == 0 {
+               return iceberg.AlwaysTrue{}, nil
+       }
+
+       if len(terms) == 1 {
+               return terms[0], nil
+       }
+
+       return iceberg.NewOr(terms[0], terms[1], terms[2:]...), nil
+}
+
+// anyToLiteral converts a dynamically-typed partition value (as
+// stored in iceberg.DataFile.Partition()) to an iceberg.Literal.
+// The supported types mirror the iceberg.LiteralType constraint.
+func anyToLiteral(v any) (iceberg.Literal, error) {
+       switch val := v.(type) {
+       case bool:
+               return iceberg.NewLiteral(val), nil
+       case int32:
+               return iceberg.NewLiteral(val), nil
+       case int64:
+               return iceberg.NewLiteral(val), nil
+       case float32:
+               return iceberg.NewLiteral(val), nil
+       case float64:
+               return iceberg.NewLiteral(val), nil
+       case string:
+               return iceberg.NewLiteral(val), nil
+       case []byte:
+               return iceberg.NewLiteral(val), nil
+       case iceberg.Date:
+               return iceberg.NewLiteral(val), nil
+       case iceberg.Time:
+               return iceberg.NewLiteral(val), nil
+       case iceberg.Timestamp:
+               return iceberg.NewLiteral(val), nil
+       case iceberg.TimestampNano:

Review Comment:
   Related to the Decimal one above — `convertAvroValueToIcebergType` 
(`manifest.go:1756-1817`) has cases for `TimestampMillis` and `TimestampMicros` 
but none for nanos, so a nanosecond-timestamp partition value arrives here as a 
raw `int64` and matches `case int64:` first. This `case iceberg.TimestampNano:` 
arm ends up unreachable from the read path.
   
   Probably worth either adding a `TimestampNanos` case to 
`convertAvroValueToIcebergType` so reads return `iceberg.TimestampNano` and 
this arm fires, or dropping this arm and noting that `TimestampNano` partitions 
aren't supported here yet. Same as Decimal, the unit table also skips this case 
so it's not caught today.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to