This is an automated email from the ASF dual-hosted git repository.

laskoviymishka pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 158255c0 fix(occ): RowDelta uses partition-scoped conflict check 
instead of AlwaysTrue (#983)
158255c0 is described below

commit 158255c00913a0adea8f7c30fabfb0b76141b02b
Author: Masa <[email protected]>
AuthorDate: Thu May 7 01:05:46 2026 +1000

    fix(occ): RowDelta uses partition-scoped conflict check instead of 
AlwaysTrue (#983)
    
    Fixes #978
    ## Problem
    
    `RowDelta.validate` passes `iceberg.AlwaysTrue{}` to
    `validateNoConflictingDataFiles` whenever equality-delete files are
    present. This means any concurrent append to **any** partition is
    treated as a conflict, even when it lands in a completely different
    partition from the equality-deletes. Under serializable isolation this
    causes spurious `ErrConflictingDataFiles` errors for workloads that
    write to multiple independent partitions concurrently.
    
    ## Fix
    
    `RowDelta` now collects the partition tuples of all equality-delete
    files it adds (`eqDeletePartitions`). A new validator,
    `validateNoConflictingDataFilesInPartitions`, checks concurrent data
    files only in those specific partition tuples:
    
    - If the partition set is empty (no eq-deletes), the check is skipped.
    - If any eq-delete is unpartitioned (empty tuple), it falls back to the
    conservative `AlwaysTrue` check, preserving existing safety.
    - Otherwise, only concurrent files in a matching partition are flagged.
    
    ## Files changed
    
    - `table/row_delta.go`: collect `eqDeletePartitions` instead of
    `hasEqDeletes`
    - `table/conflict_validation.go`:
    `validateNoConflictingDataFilesInPartitions` + `partitionTupleKey`
    - `table/partition_conflict_test.go`: unit tests for both new functions
---
 manifest.go                      |   6 +
 table/conflict_validation.go     | 180 ++++++++++
 table/partition_conflict_test.go | 704 +++++++++++++++++++++++++++++++++++++++
 table/row_delta.go               |  56 ++--
 4 files changed, 927 insertions(+), 19 deletions(-)

diff --git a/manifest.go b/manifest.go
index 82de7c1c..c79222d1 100644
--- a/manifest.go
+++ b/manifest.go
@@ -1851,6 +1851,12 @@ func (d *dataFile) convertAvroValueToIcebergType(v any, 
fieldID int) any {
                        }
 
                        return Timestamp(v.(int64))
+               case atype.TimestampNanos:
+                       if val, ok := v.(time.Time); ok {
+                               return TimestampNano(val.UTC().UnixNano())
+                       }
+
+                       return TimestampNano(v.(int64))
                case atype.Decimal:
                        if r, ok := v.(*big.Rat); ok {
                                scale := d.fieldIDToFixedSize[fieldID]
diff --git a/table/conflict_validation.go b/table/conflict_validation.go
index 1e12eace..991e6b31 100644
--- a/table/conflict_validation.go
+++ b/table/conflict_validation.go
@@ -53,6 +53,7 @@ import (
 
        "github.com/apache/iceberg-go"
        iceio "github.com/apache/iceberg-go/io"
+       "github.com/google/uuid"
 )
 
 // IsolationLevel controls how strictly a commit rejects concurrent
@@ -425,6 +426,185 @@ func validateNoConflictingDataFiles(ctx *conflictContext, 
filter iceberg.Boolean
        return validateAddedDataFilesMatchingFilter(ctx, filter)
 }
 
+// validateNoConflictingDataFilesInPartitions is like
+// validateNoConflictingDataFiles but scoped to the partitions touched
+// by equality-delete files. It builds an OR-of-equalities filter from
+// the provided partition tuples and delegates to
+// validateAddedDataFilesMatchingFilter, which performs per-spec
+// projection, manifest-summary pruning, and type-aware evaluation via
+// iceberg.Literal — making it safe for UUID, decimal, binary, fixed,
+// and future partition types, and correct across partition-spec
+// evolution because each concurrent manifest is projected against its
+// own spec ID.
+//
+// Callers are responsible for ensuring the table is partitioned
+// (i.e. at least one partition field exists) before calling this
+// function. For unpartitioned tables, call
+// validateNoConflictingDataFiles(ctx, iceberg.AlwaysTrue{}, level)
+// directly.
+//
+// Under IsolationSnapshot this validator is a no-op.
+func validateNoConflictingDataFilesInPartitions(ctx *conflictContext, 
eqDeleteFiles []iceberg.DataFile, level IsolationLevel) error {
+       if level != IsolationSerializable {
+               return nil
+       }
+
+       if len(ctx.concurrent) == 0 || len(eqDeleteFiles) == 0 {
+               return nil
+       }
+
+       filter, err := eqDeletePartitionsToFilter(eqDeleteFiles, ctx.current)
+       if err != nil {
+               return fmt.Errorf("building partition conflict filter: %w", err)
+       }
+
+       return validateNoConflictingDataFiles(ctx, filter, level)
+}
+
+// eqDeletePartitionsToFilter converts equality-delete data files into an
+// OR-of-ANDs BooleanExpression in row (source) space, suitable for passing
+// to validateAddedDataFilesMatchingFilter.
+//
+// For each eq-delete file it resolves each partition field ID to the source
+// schema field name via the file's partition spec, then builds an EqualTo
+// predicate using Reference(sourceFieldName). Multiple fields within one
+// partition are AND-ed; multiple eq-delete files are OR-ed.
+//
+// The resulting expression is projected per-concurrent-manifest's spec ID
+// inside validateAddedDataFilesMatchingFilter (via buildPartitionProjection),
+// ensuring correct conflict detection even after partition-spec evolution.
+//
+// An empty partition tuple (unpartitioned delete) returns AlwaysTrue so the
+// caller falls back to the conservative full-table scan. Callers should
+// normally guard against calling this function for unpartitioned tables (see
+// RowDelta.validate).
+func eqDeletePartitionsToFilter(files []iceberg.DataFile, meta Metadata) 
(iceberg.BooleanExpression, error) {
+       terms := make([]iceberg.BooleanExpression, 0, len(files))
+       for _, f := range files {
+               p := f.Partition()
+               if len(p) == 0 {
+                       return iceberg.AlwaysTrue{}, nil
+               }
+
+               spec := meta.PartitionSpecByID(int(f.SpecID()))
+               if spec == nil {
+                       return nil, fmt.Errorf("partition spec ID %d not found 
in metadata", f.SpecID())
+               }
+
+               // Build partition field ID → PartitionField lookup for this 
spec.
+               partFieldByID := make(map[int]iceberg.PartitionField, 
spec.NumFields())
+               for _, pf := range spec.Fields() {
+                       partFieldByID[pf.FieldID] = pf
+               }
+
+               // Sort partition field IDs for deterministic expression order.
+               fieldIDs := make([]int, 0, len(p))
+               for id := range p {
+                       fieldIDs = append(fieldIDs, id)
+               }
+               sort.Ints(fieldIDs)
+
+               // Non-identity transforms (bucket, day, hour, truncate, year, 
month) store
+               // post-transform values in DataFile.Partition(). Building a 
row-space predicate
+               // with those values would cause 
validateAddedDataFilesMatchingFilter to
+               // re-apply the transform, producing wrong matches 
(double-transformation).
+               // Fall back to AlwaysTrue (conservative: treat the eq-delete 
as table-wide)
+               // until a full PartitionSet-style partition-space approach is 
added.
+               identityOnly := true
+               for _, fid := range fieldIDs {
+                       if pf, ok := partFieldByID[fid]; ok {
+                               if _, isIdentity := 
pf.Transform.(iceberg.IdentityTransform); !isIdentity {
+                                       identityOnly = false
+
+                                       break
+                               }
+                       }
+               }
+               if !identityOnly {
+                       terms = append(terms, iceberg.AlwaysTrue{})
+
+                       continue
+               }
+
+               conjuncts := make([]iceberg.BooleanExpression, 0, len(p))
+               for _, partFieldID := range fieldIDs {
+                       pf, ok := partFieldByID[partFieldID]
+                       if !ok {
+                               return nil, fmt.Errorf("partition field ID %d 
not found in spec %d", partFieldID, f.SpecID())
+                       }
+
+                       // Resolve to source schema field to obtain the 
Reference name.
+                       sourceField, ok := 
meta.CurrentSchema().FindFieldByID(pf.SourceID())
+                       if !ok {
+                               return nil, fmt.Errorf("source field ID %d 
(partition field %q) not found in schema", pf.SourceID(), pf.Name)
+                       }
+
+                       lit, err := anyToLiteral(p[partFieldID])
+                       if err != nil {
+                               return nil, fmt.Errorf("partition field %q: 
%w", sourceField.Name, err)
+                       }
+
+                       conjuncts = append(conjuncts, 
iceberg.LiteralPredicate(iceberg.OpEQ, iceberg.Reference(sourceField.Name), 
lit))
+               }
+
+               if len(conjuncts) == 1 {
+                       terms = append(terms, conjuncts[0])
+               } else {
+                       terms = append(terms, iceberg.NewAnd(conjuncts[0], 
conjuncts[1], conjuncts[2:]...))
+               }
+       }
+
+       if len(terms) == 0 {
+               return iceberg.AlwaysTrue{}, nil
+       }
+
+       if len(terms) == 1 {
+               return terms[0], nil
+       }
+
+       return iceberg.NewOr(terms[0], terms[1], terms[2:]...), nil
+}
+
+// anyToLiteral converts a dynamically-typed partition value (as
+// stored in iceberg.DataFile.Partition()) to an iceberg.Literal.
+// The supported types mirror the iceberg.LiteralType constraint.
+func anyToLiteral(v any) (iceberg.Literal, error) {
+       switch val := v.(type) {
+       case bool:
+               return iceberg.NewLiteral(val), nil
+       case int32:
+               return iceberg.NewLiteral(val), nil
+       case int64:
+               return iceberg.NewLiteral(val), nil
+       case float32:
+               return iceberg.NewLiteral(val), nil
+       case float64:
+               return iceberg.NewLiteral(val), nil
+       case string:
+               return iceberg.NewLiteral(val), nil
+       case []byte:
+               return iceberg.NewLiteral(val), nil
+       case iceberg.Date:
+               return iceberg.NewLiteral(val), nil
+       case iceberg.Time:
+               return iceberg.NewLiteral(val), nil
+       case iceberg.Timestamp:
+               return iceberg.NewLiteral(val), nil
+       case iceberg.TimestampNano:
+               return iceberg.NewLiteral(val), nil
+       case iceberg.Decimal:
+               return iceberg.NewLiteral(val), nil
+       case iceberg.DecimalLiteral:
+               // convertAvroValueToIcebergType returns the named type 
DecimalLiteral
+               // (type DecimalLiteral Decimal), not Decimal itself. Handle 
both.
+               return iceberg.NewLiteral(iceberg.Decimal(val)), nil
+       case uuid.UUID:
+               return iceberg.NewLiteral(val), nil
+       default:
+               return nil, fmt.Errorf("unsupported partition value type %T", v)
+       }
+}
+
 // validateNoNewDeletesForRewrittenFiles rejects the commit if any
 // concurrent snapshot added delete files that would be lost when the
 // committer's rewrite replaces a data file.
diff --git a/table/partition_conflict_test.go b/table/partition_conflict_test.go
new file mode 100644
index 00000000..aa294724
--- /dev/null
+++ b/table/partition_conflict_test.go
@@ -0,0 +1,704 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "bytes"
+       "fmt"
+       "path/filepath"
+       "testing"
+
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/google/uuid"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+// ---------------------------------------------------------------------------
+// helpers
+// ---------------------------------------------------------------------------
+
+func partitionedConflictMeta(t *testing.T, schema *iceberg.Schema, partColID 
int, branchHead *int64) Metadata {
+       t.Helper()
+
+       field, ok := schema.FindFieldByID(partColID)
+       require.True(t, ok)
+
+       partSpec := iceberg.NewPartitionSpec(
+               iceberg.PartitionField{
+                       FieldID:   1000,
+                       SourceIDs: []int{partColID},
+                       Name:      field.Name,
+                       Transform: iceberg.IdentityTransform{},
+               },
+       )
+
+       props := iceberg.Properties{PropertyFormatVersion: "2"}
+       meta, err := NewMetadata(schema, &partSpec, UnsortedSortOrder, 
"file:///tmp/conflict-part-test", props)
+       require.NoError(t, err)
+
+       if branchHead == nil {
+               return meta
+       }
+
+       builder, err := MetadataBuilderFromBase(meta, "")
+       require.NoError(t, err)
+       snap := Snapshot{
+               SnapshotID:     *branchHead,
+               SequenceNumber: 1,
+               TimestampMs:    meta.LastUpdatedMillis() + 1,
+               Summary:        &Summary{Operation: OpAppend},
+       }
+       require.NoError(t, builder.AddSnapshot(&snap))
+       require.NoError(t, builder.SetSnapshotRef(MainBranch, *branchHead, 
BranchRef))
+       out, err := builder.Build()
+       require.NoError(t, err)
+
+       return out
+}
+
+func writeTestManifest(
+       t *testing.T,
+       dir string,
+       spec iceberg.PartitionSpec,
+       schema *iceberg.Schema,
+       snapshotID int64,
+       partitionValues map[int]any,
+       dataFilePath string,
+) iceberg.ManifestFile {
+       t.Helper()
+
+       df, err := iceberg.NewDataFileBuilder(
+               spec,
+               iceberg.EntryContentData,
+               dataFilePath,
+               iceberg.ParquetFile,
+               partitionValues,
+               nil,
+               nil,
+               1,
+               1024,
+       )
+       require.NoError(t, err)
+
+       entry := iceberg.NewManifestEntryBuilder(iceberg.EntryStatusADDED, 
&snapshotID, df.Build()).
+               SequenceNum(1).
+               Build()
+
+       manifestPath := filepath.ToSlash(filepath.Join(dir, 
fmt.Sprintf("manifest-%d.avro", snapshotID)))
+       var buf bytes.Buffer
+       mf, err := iceberg.WriteManifest(manifestPath, &buf, 2, spec, schema, 
snapshotID, []iceberg.ManifestEntry{entry})
+       require.NoError(t, err)
+
+       fs := iceio.LocalFS{}
+       f, err := fs.Create(manifestPath)
+       require.NoError(t, err)
+       _, err = f.Write(buf.Bytes())
+       require.NoError(t, err)
+       require.NoError(t, f.Close())
+
+       return mf
+}
+
+func writeTestManifestList(
+       t *testing.T,
+       dir string,
+       snapshotID int64,
+       manifests []iceberg.ManifestFile,
+) string {
+       t.Helper()
+
+       listPath := filepath.ToSlash(filepath.Join(dir, 
fmt.Sprintf("snap-%d-manifests.avro", snapshotID)))
+       fs := iceio.LocalFS{}
+       f, err := fs.Create(listPath)
+       require.NoError(t, err)
+
+       seqNum := int64(1)
+       err = iceberg.WriteManifestList(2, f, snapshotID, nil, &seqNum, 0, 
manifests)
+       require.NoError(t, err)
+       require.NoError(t, f.Close())
+
+       return listPath
+}
+
+// buildPartitionedContext creates a conflictContext where the writer's base
+// metadata has a snapshot at writerBaseID, and a concurrent commit at
+// concSnapshotID (pointing at the given manifest list) happened afterward.
+//
+// The conflictContext will have ctx.concurrent == [concSnapshot].
+func buildPartitionedContext(
+       t *testing.T,
+       writerBaseMeta Metadata,
+       manifestListPath string,
+       writerBaseID int64,
+       concSnapshotID int64,
+) *conflictContext {
+       t.Helper()
+
+       // currentMeta: writerBaseMeta + concurrent snapshot with manifest list.
+       builder, err := MetadataBuilderFromBase(writerBaseMeta, "")
+       require.NoError(t, err)
+
+       concSnap := Snapshot{
+               SnapshotID:       concSnapshotID,
+               ParentSnapshotID: &writerBaseID,
+               SequenceNumber:   2,
+               TimestampMs:      writerBaseMeta.LastUpdatedMillis() + 2,
+               ManifestList:     manifestListPath,
+               Summary:          &Summary{Operation: OpAppend},
+       }
+       require.NoError(t, builder.AddSnapshot(&concSnap))
+       require.NoError(t, builder.SetSnapshotRef(MainBranch, concSnapshotID, 
BranchRef))
+       current, err := builder.Build()
+       require.NoError(t, err)
+
+       ctx, err := newConflictContext(writerBaseMeta, current, MainBranch, 
iceio.LocalFS{}, true)
+       require.NoError(t, err)
+
+       return ctx
+}
+
+// ---------------------------------------------------------------------------
+// anyToLiteral
+// ---------------------------------------------------------------------------
+
+func TestAnyToLiteral_SupportedTypes(t *testing.T) {
+       cases := []struct {
+               name string
+               v    any
+       }{
+               {"bool", true},
+               {"int32", int32(42)},
+               {"int64", int64(42)},
+               {"float32", float32(1.5)},
+               {"float64", float64(1.5)},
+               {"string", "hello"},
+               {"bytes", []byte{0x01}},
+               {"Date", iceberg.Date(100)},
+               {"Time", iceberg.Time(1000)},
+               {"Timestamp", iceberg.Timestamp(9999)},
+               // TimestampNano: arm in anyToLiteral was unreachable from the 
manifest read path
+               // because convertAvroValueToIcebergType had no timestamp-nanos 
case; now fixed.
+               {"TimestampNano", iceberg.TimestampNano(9999)},
+               {"UUID", 
uuid.MustParse("550e8400-e29b-41d4-a716-446655440000")},
+               // DecimalLiteral is the named type returned by 
convertAvroValueToIcebergType
+               // (type DecimalLiteral Decimal). It must be accepted without 
falling through
+               // to the default error branch.
+               {"DecimalLiteral", iceberg.DecimalLiteral{Scale: 2}},
+       }
+
+       for _, tc := range cases {
+               t.Run(tc.name, func(t *testing.T) {
+                       lit, err := anyToLiteral(tc.v)
+                       require.NoError(t, err)
+                       assert.NotNil(t, lit)
+               })
+       }
+}
+
+func TestAnyToLiteral_UnsupportedType(t *testing.T) {
+       _, err := anyToLiteral(struct{}{})
+       require.Error(t, err)
+       assert.Contains(t, err.Error(), "unsupported partition value type")
+}
+
+// ---------------------------------------------------------------------------
+// validateNoConflictingDataFilesInPartitions short-circuit paths
+// ---------------------------------------------------------------------------
+
+func TestValidateNoConflictingDataFilesInPartitions_SnapshotIsolationIsNoOp(t 
*testing.T) {
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "region", Type: 
iceberg.PrimitiveTypes.String, Required: true},
+       )
+       head := int64(1)
+       meta := partitionedConflictMeta(t, schema, 1, &head)
+       ctx, err := newConflictContext(meta, meta, MainBranch, nil, true)
+       require.NoError(t, err)
+
+       spec := meta.PartitionSpec()
+       partVals := map[int]any{}
+       for _, pf := range spec.Fields() {
+               partVals[pf.FieldID] = "us-east-1"
+       }
+       df, err := iceberg.NewDataFileBuilder(
+               spec, iceberg.EntryContentEqDeletes,
+               "s3://bucket/eq-del.parquet", iceberg.ParquetFile,
+               partVals, nil, nil, 1, 1024,
+       )
+       require.NoError(t, err)
+
+       require.NoError(t, validateNoConflictingDataFilesInPartitions(ctx, 
[]iceberg.DataFile{df.Build()}, IsolationSnapshot))
+}
+
+func TestValidateNoConflictingDataFilesInPartitions_EmptyInputsNoOp(t 
*testing.T) {
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "region", Type: 
iceberg.PrimitiveTypes.String, Required: true},
+       )
+       head := int64(1)
+       meta := partitionedConflictMeta(t, schema, 1, &head)
+       ctx, err := newConflictContext(meta, meta, MainBranch, nil, true)
+       require.NoError(t, err)
+
+       require.NoError(t, validateNoConflictingDataFilesInPartitions(ctx, nil, 
IsolationSerializable))
+       require.NoError(t, validateNoConflictingDataFilesInPartitions(ctx, 
[]iceberg.DataFile{}, IsolationSerializable))
+}
+
+// ---------------------------------------------------------------------------
+// Regression #978: different-partition concurrent append must NOT be rejected
+// ---------------------------------------------------------------------------
+
+func TestRowDeltaValidate_DifferentPartitionAllowed(t *testing.T) {
+       dir := filepath.ToSlash(t.TempDir())
+
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "region", Type: 
iceberg.PrimitiveTypes.String, Required: true},
+       )
+
+       writerBaseID := int64(99)
+       baseMeta := partitionedConflictMeta(t, schema, 2, &writerBaseID)
+       concSnapshotID := int64(100)
+
+       spec := baseMeta.PartitionSpec()
+
+       euPartition := map[int]any{}
+       for _, pf := range spec.Fields() {
+               euPartition[pf.FieldID] = "eu-west-1"
+       }
+       mf := writeTestManifest(t, dir, spec, schema, concSnapshotID, 
euPartition, dir+"/eu-data.parquet")
+       listPath := writeTestManifestList(t, dir, concSnapshotID, 
[]iceberg.ManifestFile{mf})
+
+       ctx := buildPartitionedContext(t, baseMeta, listPath, writerBaseID, 
concSnapshotID)
+       require.Len(t, ctx.concurrent, 1)
+
+       usPartition := map[int]any{}
+       for _, pf := range spec.Fields() {
+               usPartition[pf.FieldID] = "us-east-1"
+       }
+       eqDf, err := iceberg.NewDataFileBuilder(
+               spec, iceberg.EntryContentEqDeletes,
+               dir+"/us-eq-del.parquet", iceberg.ParquetFile,
+               usPartition, nil, nil, 1, 1024,
+       )
+       require.NoError(t, err)
+
+       err = validateNoConflictingDataFilesInPartitions(ctx, 
[]iceberg.DataFile{eqDf.Build()}, IsolationSerializable)
+       assert.NoError(t, err, "different-partition concurrent append must not 
be rejected")
+}
+
+// ---------------------------------------------------------------------------
+// Same-partition concurrent append MUST be rejected
+// ---------------------------------------------------------------------------
+
+func TestRowDeltaValidate_SamePartitionRejected(t *testing.T) {
+       dir := filepath.ToSlash(t.TempDir())
+
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "region", Type: 
iceberg.PrimitiveTypes.String, Required: true},
+       )
+
+       writerBaseID := int64(199)
+       baseMeta := partitionedConflictMeta(t, schema, 2, &writerBaseID)
+       concSnapshotID := int64(200)
+
+       spec := baseMeta.PartitionSpec()
+
+       usPartition := map[int]any{}
+       for _, pf := range spec.Fields() {
+               usPartition[pf.FieldID] = "us-east-1"
+       }
+       mf := writeTestManifest(t, dir, spec, schema, concSnapshotID, 
usPartition, dir+"/us-data.parquet")
+       listPath := writeTestManifestList(t, dir, concSnapshotID, 
[]iceberg.ManifestFile{mf})
+
+       ctx := buildPartitionedContext(t, baseMeta, listPath, writerBaseID, 
concSnapshotID)
+       require.Len(t, ctx.concurrent, 1)
+
+       eqDf, err := iceberg.NewDataFileBuilder(
+               spec, iceberg.EntryContentEqDeletes,
+               dir+"/us-eq-del.parquet", iceberg.ParquetFile,
+               usPartition, nil, nil, 1, 1024,
+       )
+       require.NoError(t, err)
+
+       err = validateNoConflictingDataFilesInPartitions(ctx, 
[]iceberg.DataFile{eqDf.Build()}, IsolationSerializable)
+       require.Error(t, err)
+       assert.ErrorIs(t, err, ErrConflictingDataFiles)
+}
+
+// ---------------------------------------------------------------------------
+// UUID partition column: same UUID rejected, different UUID allowed
+// ---------------------------------------------------------------------------
+
+func TestRowDeltaValidate_UUIDPartitionSameRejected(t *testing.T) {
+       dir := filepath.ToSlash(t.TempDir())
+
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "tenant", Type: 
iceberg.PrimitiveTypes.UUID, Required: true},
+       )
+
+       writerBaseID := int64(299)
+       baseMeta := partitionedConflictMeta(t, schema, 2, &writerBaseID)
+       concSnapshotID := int64(300)
+
+       spec := baseMeta.PartitionSpec()
+       tenantA := uuid.MustParse("aaaaaaaa-0000-0000-0000-000000000001")
+
+       partA := map[int]any{}
+       for _, pf := range spec.Fields() {
+               partA[pf.FieldID] = tenantA
+       }
+
+       mf := writeTestManifest(t, dir, spec, schema, concSnapshotID, partA, 
dir+"/tenant-a-data.parquet")
+       listPath := writeTestManifestList(t, dir, concSnapshotID, 
[]iceberg.ManifestFile{mf})
+
+       ctx := buildPartitionedContext(t, baseMeta, listPath, writerBaseID, 
concSnapshotID)
+
+       eqDf, err := iceberg.NewDataFileBuilder(
+               spec, iceberg.EntryContentEqDeletes,
+               dir+"/tenant-a-eq-del.parquet", iceberg.ParquetFile,
+               partA, nil, nil, 1, 1024,
+       )
+       require.NoError(t, err)
+
+       err = validateNoConflictingDataFilesInPartitions(ctx, 
[]iceberg.DataFile{eqDf.Build()}, IsolationSerializable)
+       require.Error(t, err, "same UUID partition must conflict")
+       assert.ErrorIs(t, err, ErrConflictingDataFiles)
+}
+
+func TestRowDeltaValidate_UUIDPartitionDifferentAllowed(t *testing.T) {
+       dir := filepath.ToSlash(t.TempDir())
+
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "tenant", Type: 
iceberg.PrimitiveTypes.UUID, Required: true},
+       )
+
+       writerBaseID := int64(300)
+       baseMeta := partitionedConflictMeta(t, schema, 2, &writerBaseID)
+       concSnapshotID := int64(301)
+
+       spec := baseMeta.PartitionSpec()
+       tenantA := uuid.MustParse("aaaaaaaa-0000-0000-0000-000000000001")
+       tenantB := uuid.MustParse("bbbbbbbb-0000-0000-0000-000000000002")
+
+       concPartition := map[int]any{}
+       eqPartition := map[int]any{}
+       for _, pf := range spec.Fields() {
+               concPartition[pf.FieldID] = tenantA
+               eqPartition[pf.FieldID] = tenantB
+       }
+
+       mf := writeTestManifest(t, dir, spec, schema, concSnapshotID, 
concPartition, dir+"/tenant-a-data.parquet")
+       listPath := writeTestManifestList(t, dir, concSnapshotID, 
[]iceberg.ManifestFile{mf})
+
+       ctx := buildPartitionedContext(t, baseMeta, listPath, writerBaseID, 
concSnapshotID)
+
+       eqDf, err := iceberg.NewDataFileBuilder(
+               spec, iceberg.EntryContentEqDeletes,
+               dir+"/tenant-b-eq-del.parquet", iceberg.ParquetFile,
+               eqPartition, nil, nil, 1, 1024,
+       )
+       require.NoError(t, err)
+
+       err = validateNoConflictingDataFilesInPartitions(ctx, 
[]iceberg.DataFile{eqDf.Build()}, IsolationSerializable)
+       assert.NoError(t, err, "different UUID partition must not conflict")
+}
+
+// ---------------------------------------------------------------------------
+// Unpartitioned table: empty partition map triggers AlwaysTrue fallback
+// ---------------------------------------------------------------------------
+
+func TestRowDeltaValidate_UnpartitionedTableFallsBackToAlwaysTrue(t 
*testing.T) {
+       dir := filepath.ToSlash(t.TempDir())
+
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+       )
+
+       props := iceberg.Properties{PropertyFormatVersion: "2"}
+       emptyMeta, err := NewMetadata(schema, iceberg.UnpartitionedSpec, 
UnsortedSortOrder, "file:///tmp/unpart-test", props)
+       require.NoError(t, err)
+
+       // Writer's base: an initial snapshot already committed.
+       writerBaseID := int64(399)
+       builderBase, err := MetadataBuilderFromBase(emptyMeta, "")
+       require.NoError(t, err)
+       baseSnap := Snapshot{
+               SnapshotID:     writerBaseID,
+               SequenceNumber: 1,
+               TimestampMs:    emptyMeta.LastUpdatedMillis() + 1,
+               Summary:        &Summary{Operation: OpAppend},
+       }
+       require.NoError(t, builderBase.AddSnapshot(&baseSnap))
+       require.NoError(t, builderBase.SetSnapshotRef(MainBranch, writerBaseID, 
BranchRef))
+       baseMeta, err := builderBase.Build()
+       require.NoError(t, err)
+
+       concSnapshotID := int64(400)
+       spec := baseMeta.PartitionSpec()
+
+       mf := writeTestManifest(t, dir, spec, schema, concSnapshotID, nil, 
dir+"/unpart-data.parquet")
+       listPath := writeTestManifestList(t, dir, concSnapshotID, 
[]iceberg.ManifestFile{mf})
+
+       ctx := buildPartitionedContext(t, baseMeta, listPath, writerBaseID, 
concSnapshotID)
+       require.Len(t, ctx.concurrent, 1)
+
+       eqDf, err := iceberg.NewDataFileBuilder(
+               spec, iceberg.EntryContentEqDeletes,
+               dir+"/unpart-eq-del.parquet", iceberg.ParquetFile,
+               nil, nil, nil, 1, 1024,
+       )
+       require.NoError(t, err)
+
+       err = validateNoConflictingDataFilesInPartitions(ctx, 
[]iceberg.DataFile{eqDf.Build()}, IsolationSerializable)
+       require.Error(t, err, "unpartitioned table with concurrent append must 
conflict under AlwaysTrue fallback")
+       assert.ErrorIs(t, err, ErrConflictingDataFiles)
+}
+
+// ---------------------------------------------------------------------------
+// Spec evolution: eq-delete under spec A, concurrent data under spec B
+// (renamed partition field, same source field) — conflict must still be 
detected
+// ---------------------------------------------------------------------------
+
+func TestRowDeltaValidate_SpecEvolutionConflictDetected(t *testing.T) {
+       dir := filepath.ToSlash(t.TempDir())
+
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "region", Type: 
iceberg.PrimitiveTypes.String, Required: true},
+       )
+
+       // Spec A: identity(region) — the original partition spec.
+       // partitionedConflictMeta assigns specID=0 and partitionFieldID=1000.
+       initialMeta := partitionedConflictMeta(t, schema, 2, nil) // no 
snapshot yet
+       specAID := initialMeta.DefaultPartitionSpec()             // = 0
+
+       // Add spec B to the metadata: same source field, renamed partition 
field
+       // ("region_v2") — simulating a partition-spec evolution.  The builder
+       // assigns it a new specID (1) and a new partitionFieldID (1001).
+       builder, err := MetadataBuilderFromBase(initialMeta, "")
+       require.NoError(t, err)
+
+       specBInput := iceberg.NewPartitionSpec(
+               iceberg.PartitionField{
+                       // FieldID intentionally left 0 (unassigned) so the 
builder assigns a new one.
+                       SourceIDs: []int{2},
+                       Name:      "region_v2",
+                       Transform: iceberg.IdentityTransform{},
+               },
+       )
+       require.NoError(t, builder.AddPartitionSpec(&specBInput, false))
+
+       // spec B ID = specAID + 1 (reuseOrCreateNewPartitionSpecID increments 
from max existing).
+       specBID := specAID + 1
+       require.NoError(t, builder.SetDefaultSpecID(specBID))
+
+       // Writer's base snapshot: committed before the spec evolution was 
applied.
+       writerBaseID := int64(499)
+       baseSnap := Snapshot{
+               SnapshotID:     writerBaseID,
+               SequenceNumber: 1,
+               TimestampMs:    initialMeta.LastUpdatedMillis() + 1,
+               Summary:        &Summary{Operation: OpAppend},
+       }
+       require.NoError(t, builder.AddSnapshot(&baseSnap))
+       require.NoError(t, builder.SetSnapshotRef(MainBranch, writerBaseID, 
BranchRef))
+       writerBaseMeta, err := builder.Build()
+       require.NoError(t, err)
+
+       // Resolve both specs from the built metadata (both must be present).
+       resolvedSpecA := writerBaseMeta.PartitionSpecByID(specAID)
+       require.NotNil(t, resolvedSpecA, "spec A not found in metadata")
+       resolvedSpecB := writerBaseMeta.PartitionSpecByID(specBID)
+       require.NotNil(t, resolvedSpecB, "spec B not found in metadata")
+
+       // Retrieve actual partition field IDs assigned by the builder.
+       var specAFieldID, specBFieldID int
+       for _, pf := range resolvedSpecA.Fields() {
+               specAFieldID = pf.FieldID
+       }
+       for _, pf := range resolvedSpecB.Fields() {
+               specBFieldID = pf.FieldID
+       }
+       require.NotEqual(t, specAFieldID, specBFieldID,
+               "spec A and spec B must have different partition field IDs")
+
+       // Concurrent commit: data file written under spec B, region_v2 = 
"us-east-1".
+       concSnapshotID := int64(500)
+       specBPartition := map[int]any{specBFieldID: "us-east-1"}
+       mf := writeTestManifest(t, dir, *resolvedSpecB, schema, concSnapshotID, 
specBPartition, dir+"/spec-b-data.parquet")
+       listPath := writeTestManifestList(t, dir, concSnapshotID, 
[]iceberg.ManifestFile{mf})
+
+       ctx := buildPartitionedContext(t, writerBaseMeta, listPath, 
writerBaseID, concSnapshotID)
+       require.Len(t, ctx.concurrent, 1)
+
+       // Eq-delete file written under spec A, region = "us-east-1".
+       // Same logical partition value, different spec — cross-spec conflict.
+       specAPartition := map[int]any{specAFieldID: "us-east-1"}
+       eqDf, err := iceberg.NewDataFileBuilder(
+               *resolvedSpecA, iceberg.EntryContentEqDeletes,
+               dir+"/spec-a-eq-del.parquet", iceberg.ParquetFile,
+               specAPartition, nil, nil, 1, 1024,
+       )
+       require.NoError(t, err)
+
+       // eqDeletePartitionsToFilter builds Reference("region") == "us-east-1" 
(row space, spec A).
+       // validateAddedDataFilesMatchingFilter projects this against spec B's 
"region_v2"
+       // (also identity on source "region"), correctly matching the 
concurrent file.
+       err = validateNoConflictingDataFilesInPartitions(ctx, 
[]iceberg.DataFile{eqDf.Build()}, IsolationSerializable)
+       require.Error(t, err, "cross-spec same-partition conflict must be 
detected after spec evolution")
+       assert.ErrorIs(t, err, ErrConflictingDataFiles)
+}
+
+// ---------------------------------------------------------------------------
+// Non-identity transforms: bucket and day fall back to AlwaysTrue
+// ---------------------------------------------------------------------------
+
+// TestRowDeltaValidate_BucketTransformFallsBackToAlwaysTrue verifies that a
+// bucket[N]-partitioned eq-delete triggers the AlwaysTrue conservative 
fallback.
+// Bucket partition values stored in DataFile.Partition() are post-transform
+// (bucket indices); building a row-space predicate from them would cause
+// double-transformation downstream. The safe path is to treat the eq-delete as
+// table-wide (AlwaysTrue), which rejects any concurrent data file.
+func TestRowDeltaValidate_BucketTransformFallsBackToAlwaysTrue(t *testing.T) {
+       dir := filepath.ToSlash(t.TempDir())
+
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "user_id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+       )
+
+       partSpec := iceberg.NewPartitionSpec(
+               iceberg.PartitionField{
+                       FieldID:   1000,
+                       SourceIDs: []int{2},
+                       Name:      "user_id_bucket",
+                       Transform: iceberg.BucketTransform{NumBuckets: 16},
+               },
+       )
+       props := iceberg.Properties{PropertyFormatVersion: "2"}
+       meta, err := NewMetadata(schema, &partSpec, UnsortedSortOrder, 
"file:///tmp/bucket-test", props)
+       require.NoError(t, err)
+
+       writerBaseID := int64(600)
+       b, err := MetadataBuilderFromBase(meta, "")
+       require.NoError(t, err)
+       require.NoError(t, b.AddSnapshot(&Snapshot{
+               SnapshotID: writerBaseID, SequenceNumber: 1,
+               TimestampMs: meta.LastUpdatedMillis() + 1,
+               Summary:     &Summary{Operation: OpAppend},
+       }))
+       require.NoError(t, b.SetSnapshotRef(MainBranch, writerBaseID, 
BranchRef))
+       baseMeta, err := b.Build()
+       require.NoError(t, err)
+
+       spec := baseMeta.PartitionSpec()
+       var bucketFieldID int
+       for _, pf := range spec.Fields() {
+               bucketFieldID = pf.FieldID
+       }
+
+       // Concurrent commit: data file in bucket 1.
+       concSnapshotID := int64(601)
+       mf := writeTestManifest(t, dir, spec, schema, concSnapshotID,
+               map[int]any{bucketFieldID: int32(1)}, 
dir+"/bucket1-data.parquet")
+       listPath := writeTestManifestList(t, dir, concSnapshotID, 
[]iceberg.ManifestFile{mf})
+       ctx := buildPartitionedContext(t, baseMeta, listPath, writerBaseID, 
concSnapshotID)
+       require.Len(t, ctx.concurrent, 1)
+
+       // Eq-delete in bucket 1 — same bucket, but non-identity transform 
forces
+       // AlwaysTrue fallback; concurrent file must still be rejected.
+       eqDf, err := iceberg.NewDataFileBuilder(
+               spec, iceberg.EntryContentEqDeletes,
+               dir+"/bucket1-eq-del.parquet", iceberg.ParquetFile,
+               map[int]any{bucketFieldID: int32(1)}, nil, nil, 1, 1024,
+       )
+       require.NoError(t, err)
+
+       err = validateNoConflictingDataFilesInPartitions(ctx, 
[]iceberg.DataFile{eqDf.Build()}, IsolationSerializable)
+       require.Error(t, err, "bucket-partitioned eq-delete must trigger 
conservative AlwaysTrue fallback")
+       assert.ErrorIs(t, err, ErrConflictingDataFiles)
+}
+
+// TestRowDeltaValidate_DayTransformFallsBackToAlwaysTrue verifies that a
+// day(ts)-partitioned eq-delete also triggers the AlwaysTrue fallback — even
+// when the concurrent data file is in a different day. The code cannot safely
+// reverse a day transform to obtain a row-space bound, so it must be 
conservative.
+func TestRowDeltaValidate_DayTransformFallsBackToAlwaysTrue(t *testing.T) {
+       dir := filepath.ToSlash(t.TempDir())
+
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "event_ts", Type: 
iceberg.PrimitiveTypes.Timestamp, Required: true},
+       )
+
+       partSpec := iceberg.NewPartitionSpec(
+               iceberg.PartitionField{
+                       FieldID:   1000,
+                       SourceIDs: []int{2},
+                       Name:      "event_day",
+                       Transform: iceberg.DayTransform{},
+               },
+       )
+       props := iceberg.Properties{PropertyFormatVersion: "2"}
+       meta, err := NewMetadata(schema, &partSpec, UnsortedSortOrder, 
"file:///tmp/day-test", props)
+       require.NoError(t, err)
+
+       writerBaseID := int64(700)
+       b, err := MetadataBuilderFromBase(meta, "")
+       require.NoError(t, err)
+       require.NoError(t, b.AddSnapshot(&Snapshot{
+               SnapshotID: writerBaseID, SequenceNumber: 1,
+               TimestampMs: meta.LastUpdatedMillis() + 1,
+               Summary:     &Summary{Operation: OpAppend},
+       }))
+       require.NoError(t, b.SetSnapshotRef(MainBranch, writerBaseID, 
BranchRef))
+       baseMeta, err := b.Build()
+       require.NoError(t, err)
+
+       spec := baseMeta.PartitionSpec()
+       var dayFieldID int
+       for _, pf := range spec.Fields() {
+               dayFieldID = pf.FieldID
+       }
+
+       // Concurrent commit: data file in day 100 (days since epoch).
+       concSnapshotID := int64(701)
+       mf := writeTestManifest(t, dir, spec, schema, concSnapshotID,
+               map[int]any{dayFieldID: int32(100)}, dir+"/day100-data.parquet")
+       listPath := writeTestManifestList(t, dir, concSnapshotID, 
[]iceberg.ManifestFile{mf})
+       ctx := buildPartitionedContext(t, baseMeta, listPath, writerBaseID, 
concSnapshotID)
+       require.Len(t, ctx.concurrent, 1)
+
+       // Eq-delete in day 200 — a different day, but non-identity transform 
means
+       // the code falls back to AlwaysTrue and cannot distinguish the days;
+       // concurrent file must still be rejected (conservative correctness).
+       eqDf, err := iceberg.NewDataFileBuilder(
+               spec, iceberg.EntryContentEqDeletes,
+               dir+"/day200-eq-del.parquet", iceberg.ParquetFile,
+               map[int]any{dayFieldID: int32(200)}, nil, nil, 1, 1024,
+       )
+       require.NoError(t, err)
+
+       err = validateNoConflictingDataFilesInPartitions(ctx, 
[]iceberg.DataFile{eqDf.Build()}, IsolationSerializable)
+       require.Error(t, err, "day-partitioned eq-delete must trigger 
conservative AlwaysTrue fallback even for a different day")
+       assert.ErrorIs(t, err, ErrConflictingDataFiles)
+}
diff --git a/table/row_delta.go b/table/row_delta.go
index f73b45a3..2f7de4f1 100644
--- a/table/row_delta.go
+++ b/table/row_delta.go
@@ -44,11 +44,13 @@ import (
 //   - Position deletes: referenced data files must still be reachable
 //     from the current branch head (validateDataFilesExist).
 //   - Equality deletes under write.delete.isolation-level=serializable
-//     (the default): any concurrent commit that added data files is
-//     rejected. This is conservative — the filter derived from the
-//     eq-delete predicate is not yet threaded in, so the current
-//     check uses AlwaysTrue and may reject concurrent appends that
-//     could not actually match the predicate. Opt out by setting
+//     (the default): concurrent data files in the same partition(s) as
+//     the equality deletes are rejected. For partitioned tables an
+//     OR-of-equalities filter is built from the eq-delete files'
+//     partition tuples and routed through validateAddedDataFilesMatchingFilter
+//     (spec-evolution safe, manifest-summary pruning, type-aware evaluation).
+//     For unpartitioned tables the check is conservative (AlwaysTrue —
+//     any concurrent append is a conflict). Opt out by setting
 //     write.delete.isolation-level=snapshot.
 //
 // Refresh-and-replay between retries is deferred to a follow-up PR;
@@ -189,10 +191,14 @@ func (rd *RowDelta) Commit(ctx context.Context) error {
 //
 //   - When any equality-delete is included and isolation is
 //     SERIALIZABLE, reject the commit if a concurrent snapshot added
-//     data files (under any partition, using AlwaysTrue as the
-//     conservative filter). Java refines this with the derived
-//     eq-delete filter; a follow-up can do the same once RowDelta
-//     carries the bound predicate.
+//     conflicting data files. For unpartitioned tables the check is
+//     conservative (AlwaysTrue — any concurrent append is a conflict).
+//     For partitioned tables, an OR-of-equalities filter is built from
+//     the eq-delete files' partition tuples and routed through
+//     validateAddedDataFilesMatchingFilter, which performs per-spec
+//     projection (spec-evolution safe), manifest-summary pruning, and
+//     type-aware partition evaluation — so only concurrent data files
+//     in the same partitions as the equality deletes are rejected.
 //
 // Fast appends alongside a RowDelta see no validators from RowDelta:
 // data-only commits are as safe as a fastAppend.
@@ -209,7 +215,7 @@ func (rd *RowDelta) validate(cc *conflictContext) error {
        // matching Java's behavior when the referenced-file column is
        // unset.
        var referenced []string
-       var hasEqDeletes bool
+       var eqDeleteFiles []iceberg.DataFile
        for _, f := range rd.delFiles {
                switch f.ContentType() {
                case iceberg.EntryContentPosDeletes:
@@ -217,7 +223,7 @@ func (rd *RowDelta) validate(cc *conflictContext) error {
                                referenced = append(referenced, *ref)
                        }
                case iceberg.EntryContentEqDeletes:
-                       hasEqDeletes = true
+                       eqDeleteFiles = append(eqDeleteFiles, f)
                }
        }
 
@@ -227,16 +233,28 @@ func (rd *RowDelta) validate(cc *conflictContext) error {
                }
        }
 
-       if hasEqDeletes {
+       if len(eqDeleteFiles) > 0 {
                level := readIsolationLevel(rd.txn.meta.props,
                        WriteDeleteIsolationLevelKey, 
WriteDeleteIsolationLevelDefault)
-               // Conservative: eq-deletes apply by predicate, and RowDelta
-               // does not yet surface the bound predicate. AlwaysTrue is the
-               // safest over-approximation and matches PR 2.3's contract on
-               // validateNoConflictingDataFiles under SERIALIZABLE. Follow-up:
-               // narrow with the actual eq-delete filter once it is carried
-               // on the RowDelta.
-               if err := validateNoConflictingDataFiles(cc, 
iceberg.AlwaysTrue{}, level); err != nil {
+               // Route through the existing validateNoConflictingDataFiles 
path,
+               // which calls validateAddedDataFilesMatchingFilter internally.
+               // For unpartitioned tables, use AlwaysTrue conservatively — an
+               // equality delete can affect any row. For partitioned tables,
+               // build an OR-of-equalities filter from the eq-delete files'
+               // partition tuples so that concurrent appends to different
+               // partitions are not falsely rejected.
+               currentSpec, specErr := rd.txn.meta.CurrentSpec()
+               if specErr != nil {
+                       return fmt.Errorf("reading current partition spec: %w", 
specErr)
+               }
+
+               var err error
+               if currentSpec == nil || currentSpec.NumFields() == 0 {
+                       err = validateNoConflictingDataFiles(cc, 
iceberg.AlwaysTrue{}, level)
+               } else {
+                       err = validateNoConflictingDataFilesInPartitions(cc, 
eqDeleteFiles, level)
+               }
+               if err != nil {
                        return err
                }
        }


Reply via email to