This is an automated email from the ASF dual-hosted git repository.
laskoviymishka pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 158255c0 fix(occ): RowDelta uses partition-scoped conflict check
instead of AlwaysTrue (#983)
158255c0 is described below
commit 158255c00913a0adea8f7c30fabfb0b76141b02b
Author: Masa <[email protected]>
AuthorDate: Thu May 7 01:05:46 2026 +1000
fix(occ): RowDelta uses partition-scoped conflict check instead of
AlwaysTrue (#983)
Fixes #978
## Problem
`RowDelta.validate` passes `iceberg.AlwaysTrue{}` to
`validateNoConflictingDataFiles` whenever equality-delete files are
present. This means any concurrent append to **any** partition is
treated as a conflict, even when it lands in a completely different
partition from the equality-deletes. Under serializable isolation this
causes spurious `ErrConflictingDataFiles` errors for workloads that
write to multiple independent partitions concurrently.
## Fix
`RowDelta` now collects the partition tuples of all equality-delete
files it adds (`eqDeletePartitions`). A new validator,
`validateNoConflictingDataFilesInPartitions`, checks concurrent data
files only in those specific partition tuples:
- If the partition set is empty (no eq-deletes), the check is skipped.
- If any eq-delete is unpartitioned (empty tuple), it falls back to the
conservative `AlwaysTrue` check, preserving existing safety.
- Otherwise, only concurrent files in a matching partition are flagged.
## Files changed
- `table/row_delta.go`: collect `eqDeletePartitions` instead of
`hasEqDeletes`
- `table/conflict_validation.go`:
`validateNoConflictingDataFilesInPartitions` + `partitionTupleKey`
- `table/partition_conflict_test.go`: unit tests for both new functions
---
manifest.go | 6 +
table/conflict_validation.go | 180 ++++++++++
table/partition_conflict_test.go | 704 +++++++++++++++++++++++++++++++++++++++
table/row_delta.go | 56 ++--
4 files changed, 927 insertions(+), 19 deletions(-)
diff --git a/manifest.go b/manifest.go
index 82de7c1c..c79222d1 100644
--- a/manifest.go
+++ b/manifest.go
@@ -1851,6 +1851,12 @@ func (d *dataFile) convertAvroValueToIcebergType(v any,
fieldID int) any {
}
return Timestamp(v.(int64))
+ case atype.TimestampNanos:
+ if val, ok := v.(time.Time); ok {
+ return TimestampNano(val.UTC().UnixNano())
+ }
+
+ return TimestampNano(v.(int64))
case atype.Decimal:
if r, ok := v.(*big.Rat); ok {
scale := d.fieldIDToFixedSize[fieldID]
diff --git a/table/conflict_validation.go b/table/conflict_validation.go
index 1e12eace..991e6b31 100644
--- a/table/conflict_validation.go
+++ b/table/conflict_validation.go
@@ -53,6 +53,7 @@ import (
"github.com/apache/iceberg-go"
iceio "github.com/apache/iceberg-go/io"
+ "github.com/google/uuid"
)
// IsolationLevel controls how strictly a commit rejects concurrent
@@ -425,6 +426,185 @@ func validateNoConflictingDataFiles(ctx *conflictContext,
filter iceberg.Boolean
return validateAddedDataFilesMatchingFilter(ctx, filter)
}
+// validateNoConflictingDataFilesInPartitions is like
+// validateNoConflictingDataFiles but scoped to the partitions touched
+// by equality-delete files. It builds an OR-of-equalities filter from
+// the provided partition tuples and delegates to
+// validateAddedDataFilesMatchingFilter, which performs per-spec
+// projection, manifest-summary pruning, and type-aware evaluation via
+// iceberg.Literal — making it safe for UUID, decimal, binary, fixed,
+// and future partition types, and correct across partition-spec
+// evolution because each concurrent manifest is projected against its
+// own spec ID.
+//
+// Callers are responsible for ensuring the table is partitioned
+// (i.e. at least one partition field exists) before calling this
+// function. For unpartitioned tables, call
+// validateNoConflictingDataFiles(ctx, iceberg.AlwaysTrue{}, level)
+// directly.
+//
+// Under IsolationSnapshot this validator is a no-op.
+func validateNoConflictingDataFilesInPartitions(ctx *conflictContext,
eqDeleteFiles []iceberg.DataFile, level IsolationLevel) error {
+ if level != IsolationSerializable {
+ return nil
+ }
+
+ if len(ctx.concurrent) == 0 || len(eqDeleteFiles) == 0 {
+ return nil
+ }
+
+ filter, err := eqDeletePartitionsToFilter(eqDeleteFiles, ctx.current)
+ if err != nil {
+ return fmt.Errorf("building partition conflict filter: %w", err)
+ }
+
+ return validateNoConflictingDataFiles(ctx, filter, level)
+}
+
+// eqDeletePartitionsToFilter converts equality-delete data files into an
+// OR-of-ANDs BooleanExpression in row (source) space, suitable for passing
+// to validateAddedDataFilesMatchingFilter.
+//
+// For each eq-delete file it resolves each partition field ID to the source
+// schema field name via the file's partition spec, then builds an EqualTo
+// predicate using Reference(sourceFieldName). Multiple fields within one
+// partition are AND-ed; multiple eq-delete files are OR-ed.
+//
+// The resulting expression is projected per-concurrent-manifest's spec ID
+// inside validateAddedDataFilesMatchingFilter (via buildPartitionProjection),
+// ensuring correct conflict detection even after partition-spec evolution.
+//
+// An empty partition tuple (unpartitioned delete) returns AlwaysTrue so the
+// caller falls back to the conservative full-table scan. Callers should
+// normally guard against calling this function for unpartitioned tables (see
+// RowDelta.validate).
+func eqDeletePartitionsToFilter(files []iceberg.DataFile, meta Metadata)
(iceberg.BooleanExpression, error) {
+ terms := make([]iceberg.BooleanExpression, 0, len(files))
+ for _, f := range files {
+ p := f.Partition()
+ if len(p) == 0 {
+ return iceberg.AlwaysTrue{}, nil
+ }
+
+ spec := meta.PartitionSpecByID(int(f.SpecID()))
+ if spec == nil {
+ return nil, fmt.Errorf("partition spec ID %d not found
in metadata", f.SpecID())
+ }
+
+ // Build partition field ID → PartitionField lookup for this
spec.
+ partFieldByID := make(map[int]iceberg.PartitionField,
spec.NumFields())
+ for _, pf := range spec.Fields() {
+ partFieldByID[pf.FieldID] = pf
+ }
+
+ // Sort partition field IDs for deterministic expression order.
+ fieldIDs := make([]int, 0, len(p))
+ for id := range p {
+ fieldIDs = append(fieldIDs, id)
+ }
+ sort.Ints(fieldIDs)
+
+ // Non-identity transforms (bucket, day, hour, truncate, year,
month) store
+ // post-transform values in DataFile.Partition(). Building a
row-space predicate
+ // with those values would cause
validateAddedDataFilesMatchingFilter to
+ // re-apply the transform, producing wrong matches
(double-transformation).
+ // Fall back to AlwaysTrue (conservative: treat the eq-delete
as table-wide)
+ // until a full PartitionSet-style partition-space approach is
added.
+ identityOnly := true
+ for _, fid := range fieldIDs {
+ if pf, ok := partFieldByID[fid]; ok {
+ if _, isIdentity :=
pf.Transform.(iceberg.IdentityTransform); !isIdentity {
+ identityOnly = false
+
+ break
+ }
+ }
+ }
+ if !identityOnly {
+ terms = append(terms, iceberg.AlwaysTrue{})
+
+ continue
+ }
+
+ conjuncts := make([]iceberg.BooleanExpression, 0, len(p))
+ for _, partFieldID := range fieldIDs {
+ pf, ok := partFieldByID[partFieldID]
+ if !ok {
+ return nil, fmt.Errorf("partition field ID %d
not found in spec %d", partFieldID, f.SpecID())
+ }
+
+ // Resolve to source schema field to obtain the
Reference name.
+ sourceField, ok :=
meta.CurrentSchema().FindFieldByID(pf.SourceID())
+ if !ok {
+ return nil, fmt.Errorf("source field ID %d
(partition field %q) not found in schema", pf.SourceID(), pf.Name)
+ }
+
+ lit, err := anyToLiteral(p[partFieldID])
+ if err != nil {
+ return nil, fmt.Errorf("partition field %q:
%w", sourceField.Name, err)
+ }
+
+ conjuncts = append(conjuncts,
iceberg.LiteralPredicate(iceberg.OpEQ, iceberg.Reference(sourceField.Name),
lit))
+ }
+
+ if len(conjuncts) == 1 {
+ terms = append(terms, conjuncts[0])
+ } else {
+ terms = append(terms, iceberg.NewAnd(conjuncts[0],
conjuncts[1], conjuncts[2:]...))
+ }
+ }
+
+ if len(terms) == 0 {
+ return iceberg.AlwaysTrue{}, nil
+ }
+
+ if len(terms) == 1 {
+ return terms[0], nil
+ }
+
+ return iceberg.NewOr(terms[0], terms[1], terms[2:]...), nil
+}
+
+// anyToLiteral converts a dynamically-typed partition value (as
+// stored in iceberg.DataFile.Partition()) to an iceberg.Literal.
+// The supported types mirror the iceberg.LiteralType constraint.
+func anyToLiteral(v any) (iceberg.Literal, error) {
+ switch val := v.(type) {
+ case bool:
+ return iceberg.NewLiteral(val), nil
+ case int32:
+ return iceberg.NewLiteral(val), nil
+ case int64:
+ return iceberg.NewLiteral(val), nil
+ case float32:
+ return iceberg.NewLiteral(val), nil
+ case float64:
+ return iceberg.NewLiteral(val), nil
+ case string:
+ return iceberg.NewLiteral(val), nil
+ case []byte:
+ return iceberg.NewLiteral(val), nil
+ case iceberg.Date:
+ return iceberg.NewLiteral(val), nil
+ case iceberg.Time:
+ return iceberg.NewLiteral(val), nil
+ case iceberg.Timestamp:
+ return iceberg.NewLiteral(val), nil
+ case iceberg.TimestampNano:
+ return iceberg.NewLiteral(val), nil
+ case iceberg.Decimal:
+ return iceberg.NewLiteral(val), nil
+ case iceberg.DecimalLiteral:
+ // convertAvroValueToIcebergType returns the named type
DecimalLiteral
+ // (type DecimalLiteral Decimal), not Decimal itself. Handle
both.
+ return iceberg.NewLiteral(iceberg.Decimal(val)), nil
+ case uuid.UUID:
+ return iceberg.NewLiteral(val), nil
+ default:
+ return nil, fmt.Errorf("unsupported partition value type %T", v)
+ }
+}
+
// validateNoNewDeletesForRewrittenFiles rejects the commit if any
// concurrent snapshot added delete files that would be lost when the
// committer's rewrite replaces a data file.
diff --git a/table/partition_conflict_test.go b/table/partition_conflict_test.go
new file mode 100644
index 00000000..aa294724
--- /dev/null
+++ b/table/partition_conflict_test.go
@@ -0,0 +1,704 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+ "bytes"
+ "fmt"
+ "path/filepath"
+ "testing"
+
+ "github.com/apache/iceberg-go"
+ iceio "github.com/apache/iceberg-go/io"
+ "github.com/google/uuid"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+// ---------------------------------------------------------------------------
+// helpers
+// ---------------------------------------------------------------------------
+
+func partitionedConflictMeta(t *testing.T, schema *iceberg.Schema, partColID
int, branchHead *int64) Metadata {
+ t.Helper()
+
+ field, ok := schema.FindFieldByID(partColID)
+ require.True(t, ok)
+
+ partSpec := iceberg.NewPartitionSpec(
+ iceberg.PartitionField{
+ FieldID: 1000,
+ SourceIDs: []int{partColID},
+ Name: field.Name,
+ Transform: iceberg.IdentityTransform{},
+ },
+ )
+
+ props := iceberg.Properties{PropertyFormatVersion: "2"}
+ meta, err := NewMetadata(schema, &partSpec, UnsortedSortOrder,
"file:///tmp/conflict-part-test", props)
+ require.NoError(t, err)
+
+ if branchHead == nil {
+ return meta
+ }
+
+ builder, err := MetadataBuilderFromBase(meta, "")
+ require.NoError(t, err)
+ snap := Snapshot{
+ SnapshotID: *branchHead,
+ SequenceNumber: 1,
+ TimestampMs: meta.LastUpdatedMillis() + 1,
+ Summary: &Summary{Operation: OpAppend},
+ }
+ require.NoError(t, builder.AddSnapshot(&snap))
+ require.NoError(t, builder.SetSnapshotRef(MainBranch, *branchHead,
BranchRef))
+ out, err := builder.Build()
+ require.NoError(t, err)
+
+ return out
+}
+
+func writeTestManifest(
+ t *testing.T,
+ dir string,
+ spec iceberg.PartitionSpec,
+ schema *iceberg.Schema,
+ snapshotID int64,
+ partitionValues map[int]any,
+ dataFilePath string,
+) iceberg.ManifestFile {
+ t.Helper()
+
+ df, err := iceberg.NewDataFileBuilder(
+ spec,
+ iceberg.EntryContentData,
+ dataFilePath,
+ iceberg.ParquetFile,
+ partitionValues,
+ nil,
+ nil,
+ 1,
+ 1024,
+ )
+ require.NoError(t, err)
+
+ entry := iceberg.NewManifestEntryBuilder(iceberg.EntryStatusADDED,
&snapshotID, df.Build()).
+ SequenceNum(1).
+ Build()
+
+ manifestPath := filepath.ToSlash(filepath.Join(dir,
fmt.Sprintf("manifest-%d.avro", snapshotID)))
+ var buf bytes.Buffer
+ mf, err := iceberg.WriteManifest(manifestPath, &buf, 2, spec, schema,
snapshotID, []iceberg.ManifestEntry{entry})
+ require.NoError(t, err)
+
+ fs := iceio.LocalFS{}
+ f, err := fs.Create(manifestPath)
+ require.NoError(t, err)
+ _, err = f.Write(buf.Bytes())
+ require.NoError(t, err)
+ require.NoError(t, f.Close())
+
+ return mf
+}
+
+func writeTestManifestList(
+ t *testing.T,
+ dir string,
+ snapshotID int64,
+ manifests []iceberg.ManifestFile,
+) string {
+ t.Helper()
+
+ listPath := filepath.ToSlash(filepath.Join(dir,
fmt.Sprintf("snap-%d-manifests.avro", snapshotID)))
+ fs := iceio.LocalFS{}
+ f, err := fs.Create(listPath)
+ require.NoError(t, err)
+
+ seqNum := int64(1)
+ err = iceberg.WriteManifestList(2, f, snapshotID, nil, &seqNum, 0,
manifests)
+ require.NoError(t, err)
+ require.NoError(t, f.Close())
+
+ return listPath
+}
+
+// buildPartitionedContext creates a conflictContext where the writer's base
+// metadata has a snapshot at writerBaseID, and a concurrent commit at
+// concSnapshotID (pointing at the given manifest list) happened afterward.
+//
+// The conflictContext will have ctx.concurrent == [concSnapshot].
+func buildPartitionedContext(
+ t *testing.T,
+ writerBaseMeta Metadata,
+ manifestListPath string,
+ writerBaseID int64,
+ concSnapshotID int64,
+) *conflictContext {
+ t.Helper()
+
+ // currentMeta: writerBaseMeta + concurrent snapshot with manifest list.
+ builder, err := MetadataBuilderFromBase(writerBaseMeta, "")
+ require.NoError(t, err)
+
+ concSnap := Snapshot{
+ SnapshotID: concSnapshotID,
+ ParentSnapshotID: &writerBaseID,
+ SequenceNumber: 2,
+ TimestampMs: writerBaseMeta.LastUpdatedMillis() + 2,
+ ManifestList: manifestListPath,
+ Summary: &Summary{Operation: OpAppend},
+ }
+ require.NoError(t, builder.AddSnapshot(&concSnap))
+ require.NoError(t, builder.SetSnapshotRef(MainBranch, concSnapshotID,
BranchRef))
+ current, err := builder.Build()
+ require.NoError(t, err)
+
+ ctx, err := newConflictContext(writerBaseMeta, current, MainBranch,
iceio.LocalFS{}, true)
+ require.NoError(t, err)
+
+ return ctx
+}
+
+// ---------------------------------------------------------------------------
+// anyToLiteral
+// ---------------------------------------------------------------------------
+
+func TestAnyToLiteral_SupportedTypes(t *testing.T) {
+ cases := []struct {
+ name string
+ v any
+ }{
+ {"bool", true},
+ {"int32", int32(42)},
+ {"int64", int64(42)},
+ {"float32", float32(1.5)},
+ {"float64", float64(1.5)},
+ {"string", "hello"},
+ {"bytes", []byte{0x01}},
+ {"Date", iceberg.Date(100)},
+ {"Time", iceberg.Time(1000)},
+ {"Timestamp", iceberg.Timestamp(9999)},
+ // TimestampNano: arm in anyToLiteral was unreachable from the
manifest read path
+ // because convertAvroValueToIcebergType had no timestamp-nanos
case; now fixed.
+ {"TimestampNano", iceberg.TimestampNano(9999)},
+ {"UUID",
uuid.MustParse("550e8400-e29b-41d4-a716-446655440000")},
+ // DecimalLiteral is the named type returned by
convertAvroValueToIcebergType
+ // (type DecimalLiteral Decimal). It must be accepted without
falling through
+ // to the default error branch.
+ {"DecimalLiteral", iceberg.DecimalLiteral{Scale: 2}},
+ }
+
+ for _, tc := range cases {
+ t.Run(tc.name, func(t *testing.T) {
+ lit, err := anyToLiteral(tc.v)
+ require.NoError(t, err)
+ assert.NotNil(t, lit)
+ })
+ }
+}
+
+func TestAnyToLiteral_UnsupportedType(t *testing.T) {
+ _, err := anyToLiteral(struct{}{})
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "unsupported partition value type")
+}
+
+// ---------------------------------------------------------------------------
+// validateNoConflictingDataFilesInPartitions short-circuit paths
+// ---------------------------------------------------------------------------
+
+func TestValidateNoConflictingDataFilesInPartitions_SnapshotIsolationIsNoOp(t
*testing.T) {
+ schema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "region", Type:
iceberg.PrimitiveTypes.String, Required: true},
+ )
+ head := int64(1)
+ meta := partitionedConflictMeta(t, schema, 1, &head)
+ ctx, err := newConflictContext(meta, meta, MainBranch, nil, true)
+ require.NoError(t, err)
+
+ spec := meta.PartitionSpec()
+ partVals := map[int]any{}
+ for _, pf := range spec.Fields() {
+ partVals[pf.FieldID] = "us-east-1"
+ }
+ df, err := iceberg.NewDataFileBuilder(
+ spec, iceberg.EntryContentEqDeletes,
+ "s3://bucket/eq-del.parquet", iceberg.ParquetFile,
+ partVals, nil, nil, 1, 1024,
+ )
+ require.NoError(t, err)
+
+ require.NoError(t, validateNoConflictingDataFilesInPartitions(ctx,
[]iceberg.DataFile{df.Build()}, IsolationSnapshot))
+}
+
+func TestValidateNoConflictingDataFilesInPartitions_EmptyInputsNoOp(t
*testing.T) {
+ schema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "region", Type:
iceberg.PrimitiveTypes.String, Required: true},
+ )
+ head := int64(1)
+ meta := partitionedConflictMeta(t, schema, 1, &head)
+ ctx, err := newConflictContext(meta, meta, MainBranch, nil, true)
+ require.NoError(t, err)
+
+ require.NoError(t, validateNoConflictingDataFilesInPartitions(ctx, nil,
IsolationSerializable))
+ require.NoError(t, validateNoConflictingDataFilesInPartitions(ctx,
[]iceberg.DataFile{}, IsolationSerializable))
+}
+
+// ---------------------------------------------------------------------------
+// Regression #978: different-partition concurrent append must NOT be rejected
+// ---------------------------------------------------------------------------
+
+func TestRowDeltaValidate_DifferentPartitionAllowed(t *testing.T) {
+ dir := filepath.ToSlash(t.TempDir())
+
+ schema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "region", Type:
iceberg.PrimitiveTypes.String, Required: true},
+ )
+
+ writerBaseID := int64(99)
+ baseMeta := partitionedConflictMeta(t, schema, 2, &writerBaseID)
+ concSnapshotID := int64(100)
+
+ spec := baseMeta.PartitionSpec()
+
+ euPartition := map[int]any{}
+ for _, pf := range spec.Fields() {
+ euPartition[pf.FieldID] = "eu-west-1"
+ }
+ mf := writeTestManifest(t, dir, spec, schema, concSnapshotID,
euPartition, dir+"/eu-data.parquet")
+ listPath := writeTestManifestList(t, dir, concSnapshotID,
[]iceberg.ManifestFile{mf})
+
+ ctx := buildPartitionedContext(t, baseMeta, listPath, writerBaseID,
concSnapshotID)
+ require.Len(t, ctx.concurrent, 1)
+
+ usPartition := map[int]any{}
+ for _, pf := range spec.Fields() {
+ usPartition[pf.FieldID] = "us-east-1"
+ }
+ eqDf, err := iceberg.NewDataFileBuilder(
+ spec, iceberg.EntryContentEqDeletes,
+ dir+"/us-eq-del.parquet", iceberg.ParquetFile,
+ usPartition, nil, nil, 1, 1024,
+ )
+ require.NoError(t, err)
+
+ err = validateNoConflictingDataFilesInPartitions(ctx,
[]iceberg.DataFile{eqDf.Build()}, IsolationSerializable)
+ assert.NoError(t, err, "different-partition concurrent append must not
be rejected")
+}
+
+// ---------------------------------------------------------------------------
+// Same-partition concurrent append MUST be rejected
+// ---------------------------------------------------------------------------
+
+func TestRowDeltaValidate_SamePartitionRejected(t *testing.T) {
+ dir := filepath.ToSlash(t.TempDir())
+
+ schema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "region", Type:
iceberg.PrimitiveTypes.String, Required: true},
+ )
+
+ writerBaseID := int64(199)
+ baseMeta := partitionedConflictMeta(t, schema, 2, &writerBaseID)
+ concSnapshotID := int64(200)
+
+ spec := baseMeta.PartitionSpec()
+
+ usPartition := map[int]any{}
+ for _, pf := range spec.Fields() {
+ usPartition[pf.FieldID] = "us-east-1"
+ }
+ mf := writeTestManifest(t, dir, spec, schema, concSnapshotID,
usPartition, dir+"/us-data.parquet")
+ listPath := writeTestManifestList(t, dir, concSnapshotID,
[]iceberg.ManifestFile{mf})
+
+ ctx := buildPartitionedContext(t, baseMeta, listPath, writerBaseID,
concSnapshotID)
+ require.Len(t, ctx.concurrent, 1)
+
+ eqDf, err := iceberg.NewDataFileBuilder(
+ spec, iceberg.EntryContentEqDeletes,
+ dir+"/us-eq-del.parquet", iceberg.ParquetFile,
+ usPartition, nil, nil, 1, 1024,
+ )
+ require.NoError(t, err)
+
+ err = validateNoConflictingDataFilesInPartitions(ctx,
[]iceberg.DataFile{eqDf.Build()}, IsolationSerializable)
+ require.Error(t, err)
+ assert.ErrorIs(t, err, ErrConflictingDataFiles)
+}
+
+// ---------------------------------------------------------------------------
+// UUID partition column: same UUID rejected, different UUID allowed
+// ---------------------------------------------------------------------------
+
+func TestRowDeltaValidate_UUIDPartitionSameRejected(t *testing.T) {
+ dir := filepath.ToSlash(t.TempDir())
+
+ schema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "tenant", Type:
iceberg.PrimitiveTypes.UUID, Required: true},
+ )
+
+ writerBaseID := int64(299)
+ baseMeta := partitionedConflictMeta(t, schema, 2, &writerBaseID)
+ concSnapshotID := int64(300)
+
+ spec := baseMeta.PartitionSpec()
+ tenantA := uuid.MustParse("aaaaaaaa-0000-0000-0000-000000000001")
+
+ partA := map[int]any{}
+ for _, pf := range spec.Fields() {
+ partA[pf.FieldID] = tenantA
+ }
+
+ mf := writeTestManifest(t, dir, spec, schema, concSnapshotID, partA,
dir+"/tenant-a-data.parquet")
+ listPath := writeTestManifestList(t, dir, concSnapshotID,
[]iceberg.ManifestFile{mf})
+
+ ctx := buildPartitionedContext(t, baseMeta, listPath, writerBaseID,
concSnapshotID)
+
+ eqDf, err := iceberg.NewDataFileBuilder(
+ spec, iceberg.EntryContentEqDeletes,
+ dir+"/tenant-a-eq-del.parquet", iceberg.ParquetFile,
+ partA, nil, nil, 1, 1024,
+ )
+ require.NoError(t, err)
+
+ err = validateNoConflictingDataFilesInPartitions(ctx,
[]iceberg.DataFile{eqDf.Build()}, IsolationSerializable)
+ require.Error(t, err, "same UUID partition must conflict")
+ assert.ErrorIs(t, err, ErrConflictingDataFiles)
+}
+
+func TestRowDeltaValidate_UUIDPartitionDifferentAllowed(t *testing.T) {
+ dir := filepath.ToSlash(t.TempDir())
+
+ schema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "tenant", Type:
iceberg.PrimitiveTypes.UUID, Required: true},
+ )
+
+ writerBaseID := int64(300)
+ baseMeta := partitionedConflictMeta(t, schema, 2, &writerBaseID)
+ concSnapshotID := int64(301)
+
+ spec := baseMeta.PartitionSpec()
+ tenantA := uuid.MustParse("aaaaaaaa-0000-0000-0000-000000000001")
+ tenantB := uuid.MustParse("bbbbbbbb-0000-0000-0000-000000000002")
+
+ concPartition := map[int]any{}
+ eqPartition := map[int]any{}
+ for _, pf := range spec.Fields() {
+ concPartition[pf.FieldID] = tenantA
+ eqPartition[pf.FieldID] = tenantB
+ }
+
+ mf := writeTestManifest(t, dir, spec, schema, concSnapshotID,
concPartition, dir+"/tenant-a-data.parquet")
+ listPath := writeTestManifestList(t, dir, concSnapshotID,
[]iceberg.ManifestFile{mf})
+
+ ctx := buildPartitionedContext(t, baseMeta, listPath, writerBaseID,
concSnapshotID)
+
+ eqDf, err := iceberg.NewDataFileBuilder(
+ spec, iceberg.EntryContentEqDeletes,
+ dir+"/tenant-b-eq-del.parquet", iceberg.ParquetFile,
+ eqPartition, nil, nil, 1, 1024,
+ )
+ require.NoError(t, err)
+
+ err = validateNoConflictingDataFilesInPartitions(ctx,
[]iceberg.DataFile{eqDf.Build()}, IsolationSerializable)
+ assert.NoError(t, err, "different UUID partition must not conflict")
+}
+
+// ---------------------------------------------------------------------------
+// Unpartitioned table: empty partition map triggers AlwaysTrue fallback
+// ---------------------------------------------------------------------------
+
+func TestRowDeltaValidate_UnpartitionedTableFallsBackToAlwaysTrue(t
*testing.T) {
+ dir := filepath.ToSlash(t.TempDir())
+
+ schema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ )
+
+ props := iceberg.Properties{PropertyFormatVersion: "2"}
+ emptyMeta, err := NewMetadata(schema, iceberg.UnpartitionedSpec,
UnsortedSortOrder, "file:///tmp/unpart-test", props)
+ require.NoError(t, err)
+
+ // Writer's base: an initial snapshot already committed.
+ writerBaseID := int64(399)
+ builderBase, err := MetadataBuilderFromBase(emptyMeta, "")
+ require.NoError(t, err)
+ baseSnap := Snapshot{
+ SnapshotID: writerBaseID,
+ SequenceNumber: 1,
+ TimestampMs: emptyMeta.LastUpdatedMillis() + 1,
+ Summary: &Summary{Operation: OpAppend},
+ }
+ require.NoError(t, builderBase.AddSnapshot(&baseSnap))
+ require.NoError(t, builderBase.SetSnapshotRef(MainBranch, writerBaseID,
BranchRef))
+ baseMeta, err := builderBase.Build()
+ require.NoError(t, err)
+
+ concSnapshotID := int64(400)
+ spec := baseMeta.PartitionSpec()
+
+ mf := writeTestManifest(t, dir, spec, schema, concSnapshotID, nil,
dir+"/unpart-data.parquet")
+ listPath := writeTestManifestList(t, dir, concSnapshotID,
[]iceberg.ManifestFile{mf})
+
+ ctx := buildPartitionedContext(t, baseMeta, listPath, writerBaseID,
concSnapshotID)
+ require.Len(t, ctx.concurrent, 1)
+
+ eqDf, err := iceberg.NewDataFileBuilder(
+ spec, iceberg.EntryContentEqDeletes,
+ dir+"/unpart-eq-del.parquet", iceberg.ParquetFile,
+ nil, nil, nil, 1, 1024,
+ )
+ require.NoError(t, err)
+
+ err = validateNoConflictingDataFilesInPartitions(ctx,
[]iceberg.DataFile{eqDf.Build()}, IsolationSerializable)
+ require.Error(t, err, "unpartitioned table with concurrent append must
conflict under AlwaysTrue fallback")
+ assert.ErrorIs(t, err, ErrConflictingDataFiles)
+}
+
+// ---------------------------------------------------------------------------
+// Spec evolution: eq-delete under spec A, concurrent data under spec B
+// (renamed partition field, same source field) — conflict must still be
detected
+// ---------------------------------------------------------------------------
+
+func TestRowDeltaValidate_SpecEvolutionConflictDetected(t *testing.T) {
+ dir := filepath.ToSlash(t.TempDir())
+
+ schema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "region", Type:
iceberg.PrimitiveTypes.String, Required: true},
+ )
+
+ // Spec A: identity(region) — the original partition spec.
+ // partitionedConflictMeta assigns specID=0 and partitionFieldID=1000.
+ initialMeta := partitionedConflictMeta(t, schema, 2, nil) // no
snapshot yet
+ specAID := initialMeta.DefaultPartitionSpec() // = 0
+
+ // Add spec B to the metadata: same source field, renamed partition
field
+ // ("region_v2") — simulating a partition-spec evolution. The builder
+ // assigns it a new specID (1) and a new partitionFieldID (1001).
+ builder, err := MetadataBuilderFromBase(initialMeta, "")
+ require.NoError(t, err)
+
+ specBInput := iceberg.NewPartitionSpec(
+ iceberg.PartitionField{
+ // FieldID intentionally left 0 (unassigned) so the
builder assigns a new one.
+ SourceIDs: []int{2},
+ Name: "region_v2",
+ Transform: iceberg.IdentityTransform{},
+ },
+ )
+ require.NoError(t, builder.AddPartitionSpec(&specBInput, false))
+
+ // spec B ID = specAID + 1 (reuseOrCreateNewPartitionSpecID increments
from max existing).
+ specBID := specAID + 1
+ require.NoError(t, builder.SetDefaultSpecID(specBID))
+
+ // Writer's base snapshot: committed before the spec evolution was
applied.
+ writerBaseID := int64(499)
+ baseSnap := Snapshot{
+ SnapshotID: writerBaseID,
+ SequenceNumber: 1,
+ TimestampMs: initialMeta.LastUpdatedMillis() + 1,
+ Summary: &Summary{Operation: OpAppend},
+ }
+ require.NoError(t, builder.AddSnapshot(&baseSnap))
+ require.NoError(t, builder.SetSnapshotRef(MainBranch, writerBaseID,
BranchRef))
+ writerBaseMeta, err := builder.Build()
+ require.NoError(t, err)
+
+ // Resolve both specs from the built metadata (both must be present).
+ resolvedSpecA := writerBaseMeta.PartitionSpecByID(specAID)
+ require.NotNil(t, resolvedSpecA, "spec A not found in metadata")
+ resolvedSpecB := writerBaseMeta.PartitionSpecByID(specBID)
+ require.NotNil(t, resolvedSpecB, "spec B not found in metadata")
+
+ // Retrieve actual partition field IDs assigned by the builder.
+ var specAFieldID, specBFieldID int
+ for _, pf := range resolvedSpecA.Fields() {
+ specAFieldID = pf.FieldID
+ }
+ for _, pf := range resolvedSpecB.Fields() {
+ specBFieldID = pf.FieldID
+ }
+ require.NotEqual(t, specAFieldID, specBFieldID,
+ "spec A and spec B must have different partition field IDs")
+
+ // Concurrent commit: data file written under spec B, region_v2 =
"us-east-1".
+ concSnapshotID := int64(500)
+ specBPartition := map[int]any{specBFieldID: "us-east-1"}
+ mf := writeTestManifest(t, dir, *resolvedSpecB, schema, concSnapshotID,
specBPartition, dir+"/spec-b-data.parquet")
+ listPath := writeTestManifestList(t, dir, concSnapshotID,
[]iceberg.ManifestFile{mf})
+
+ ctx := buildPartitionedContext(t, writerBaseMeta, listPath,
writerBaseID, concSnapshotID)
+ require.Len(t, ctx.concurrent, 1)
+
+ // Eq-delete file written under spec A, region = "us-east-1".
+ // Same logical partition value, different spec — cross-spec conflict.
+ specAPartition := map[int]any{specAFieldID: "us-east-1"}
+ eqDf, err := iceberg.NewDataFileBuilder(
+ *resolvedSpecA, iceberg.EntryContentEqDeletes,
+ dir+"/spec-a-eq-del.parquet", iceberg.ParquetFile,
+ specAPartition, nil, nil, 1, 1024,
+ )
+ require.NoError(t, err)
+
+ // eqDeletePartitionsToFilter builds Reference("region") == "us-east-1"
(row space, spec A).
+ // validateAddedDataFilesMatchingFilter projects this against spec B's
"region_v2"
+ // (also identity on source "region"), correctly matching the
concurrent file.
+ err = validateNoConflictingDataFilesInPartitions(ctx,
[]iceberg.DataFile{eqDf.Build()}, IsolationSerializable)
+ require.Error(t, err, "cross-spec same-partition conflict must be
detected after spec evolution")
+ assert.ErrorIs(t, err, ErrConflictingDataFiles)
+}
+
+// ---------------------------------------------------------------------------
+// Non-identity transforms: bucket and day fall back to AlwaysTrue
+// ---------------------------------------------------------------------------
+
+// TestRowDeltaValidate_BucketTransformFallsBackToAlwaysTrue verifies that a
+// bucket[N]-partitioned eq-delete triggers the AlwaysTrue conservative
fallback.
+// Bucket partition values stored in DataFile.Partition() are post-transform
+// (bucket indices); building a row-space predicate from them would cause
+// double-transformation downstream. The safe path is to treat the eq-delete as
+// table-wide (AlwaysTrue), which rejects any concurrent data file.
+func TestRowDeltaValidate_BucketTransformFallsBackToAlwaysTrue(t *testing.T) {
+ dir := filepath.ToSlash(t.TempDir())
+
+ schema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "user_id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ )
+
+ partSpec := iceberg.NewPartitionSpec(
+ iceberg.PartitionField{
+ FieldID: 1000,
+ SourceIDs: []int{2},
+ Name: "user_id_bucket",
+ Transform: iceberg.BucketTransform{NumBuckets: 16},
+ },
+ )
+ props := iceberg.Properties{PropertyFormatVersion: "2"}
+ meta, err := NewMetadata(schema, &partSpec, UnsortedSortOrder,
"file:///tmp/bucket-test", props)
+ require.NoError(t, err)
+
+ writerBaseID := int64(600)
+ b, err := MetadataBuilderFromBase(meta, "")
+ require.NoError(t, err)
+ require.NoError(t, b.AddSnapshot(&Snapshot{
+ SnapshotID: writerBaseID, SequenceNumber: 1,
+ TimestampMs: meta.LastUpdatedMillis() + 1,
+ Summary: &Summary{Operation: OpAppend},
+ }))
+ require.NoError(t, b.SetSnapshotRef(MainBranch, writerBaseID,
BranchRef))
+ baseMeta, err := b.Build()
+ require.NoError(t, err)
+
+ spec := baseMeta.PartitionSpec()
+ var bucketFieldID int
+ for _, pf := range spec.Fields() {
+ bucketFieldID = pf.FieldID
+ }
+
+ // Concurrent commit: data file in bucket 1.
+ concSnapshotID := int64(601)
+ mf := writeTestManifest(t, dir, spec, schema, concSnapshotID,
+ map[int]any{bucketFieldID: int32(1)},
dir+"/bucket1-data.parquet")
+ listPath := writeTestManifestList(t, dir, concSnapshotID,
[]iceberg.ManifestFile{mf})
+ ctx := buildPartitionedContext(t, baseMeta, listPath, writerBaseID,
concSnapshotID)
+ require.Len(t, ctx.concurrent, 1)
+
+ // Eq-delete in bucket 1 — same bucket, but non-identity transform
forces
+ // AlwaysTrue fallback; concurrent file must still be rejected.
+ eqDf, err := iceberg.NewDataFileBuilder(
+ spec, iceberg.EntryContentEqDeletes,
+ dir+"/bucket1-eq-del.parquet", iceberg.ParquetFile,
+ map[int]any{bucketFieldID: int32(1)}, nil, nil, 1, 1024,
+ )
+ require.NoError(t, err)
+
+ err = validateNoConflictingDataFilesInPartitions(ctx,
[]iceberg.DataFile{eqDf.Build()}, IsolationSerializable)
+ require.Error(t, err, "bucket-partitioned eq-delete must trigger
conservative AlwaysTrue fallback")
+ assert.ErrorIs(t, err, ErrConflictingDataFiles)
+}
+
+// TestRowDeltaValidate_DayTransformFallsBackToAlwaysTrue verifies that a
+// day(ts)-partitioned eq-delete also triggers the AlwaysTrue fallback — even
+// when the concurrent data file is in a different day. The code cannot safely
+// reverse a day transform to obtain a row-space bound, so it must be
conservative.
+func TestRowDeltaValidate_DayTransformFallsBackToAlwaysTrue(t *testing.T) {
+ dir := filepath.ToSlash(t.TempDir())
+
+ schema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "event_ts", Type:
iceberg.PrimitiveTypes.Timestamp, Required: true},
+ )
+
+ partSpec := iceberg.NewPartitionSpec(
+ iceberg.PartitionField{
+ FieldID: 1000,
+ SourceIDs: []int{2},
+ Name: "event_day",
+ Transform: iceberg.DayTransform{},
+ },
+ )
+ props := iceberg.Properties{PropertyFormatVersion: "2"}
+ meta, err := NewMetadata(schema, &partSpec, UnsortedSortOrder,
"file:///tmp/day-test", props)
+ require.NoError(t, err)
+
+ writerBaseID := int64(700)
+ b, err := MetadataBuilderFromBase(meta, "")
+ require.NoError(t, err)
+ require.NoError(t, b.AddSnapshot(&Snapshot{
+ SnapshotID: writerBaseID, SequenceNumber: 1,
+ TimestampMs: meta.LastUpdatedMillis() + 1,
+ Summary: &Summary{Operation: OpAppend},
+ }))
+ require.NoError(t, b.SetSnapshotRef(MainBranch, writerBaseID,
BranchRef))
+ baseMeta, err := b.Build()
+ require.NoError(t, err)
+
+ spec := baseMeta.PartitionSpec()
+ var dayFieldID int
+ for _, pf := range spec.Fields() {
+ dayFieldID = pf.FieldID
+ }
+
+ // Concurrent commit: data file in day 100 (days since epoch).
+ concSnapshotID := int64(701)
+ mf := writeTestManifest(t, dir, spec, schema, concSnapshotID,
+ map[int]any{dayFieldID: int32(100)}, dir+"/day100-data.parquet")
+ listPath := writeTestManifestList(t, dir, concSnapshotID,
[]iceberg.ManifestFile{mf})
+ ctx := buildPartitionedContext(t, baseMeta, listPath, writerBaseID,
concSnapshotID)
+ require.Len(t, ctx.concurrent, 1)
+
+ // Eq-delete in day 200 — a different day, but non-identity transform
means
+ // the code falls back to AlwaysTrue and cannot distinguish the days;
+ // concurrent file must still be rejected (conservative correctness).
+ eqDf, err := iceberg.NewDataFileBuilder(
+ spec, iceberg.EntryContentEqDeletes,
+ dir+"/day200-eq-del.parquet", iceberg.ParquetFile,
+ map[int]any{dayFieldID: int32(200)}, nil, nil, 1, 1024,
+ )
+ require.NoError(t, err)
+
+ err = validateNoConflictingDataFilesInPartitions(ctx,
[]iceberg.DataFile{eqDf.Build()}, IsolationSerializable)
+ require.Error(t, err, "day-partitioned eq-delete must trigger
conservative AlwaysTrue fallback even for a different day")
+ assert.ErrorIs(t, err, ErrConflictingDataFiles)
+}
diff --git a/table/row_delta.go b/table/row_delta.go
index f73b45a3..2f7de4f1 100644
--- a/table/row_delta.go
+++ b/table/row_delta.go
@@ -44,11 +44,13 @@ import (
// - Position deletes: referenced data files must still be reachable
// from the current branch head (validateDataFilesExist).
// - Equality deletes under write.delete.isolation-level=serializable
-// (the default): any concurrent commit that added data files is
-// rejected. This is conservative — the filter derived from the
-// eq-delete predicate is not yet threaded in, so the current
-// check uses AlwaysTrue and may reject concurrent appends that
-// could not actually match the predicate. Opt out by setting
+// (the default): concurrent data files in the same partition(s) as
+// the equality deletes are rejected. For partitioned tables an
+// OR-of-equalities filter is built from the eq-delete files'
+// partition tuples and routed through validateAddedDataFilesMatchingFilter
+// (spec-evolution safe, manifest-summary pruning, type-aware evaluation).
+// For unpartitioned tables the check is conservative (AlwaysTrue —
+// any concurrent append is a conflict). Opt out by setting
// write.delete.isolation-level=snapshot.
//
// Refresh-and-replay between retries is deferred to a follow-up PR;
@@ -189,10 +191,14 @@ func (rd *RowDelta) Commit(ctx context.Context) error {
//
// - When any equality-delete is included and isolation is
// SERIALIZABLE, reject the commit if a concurrent snapshot added
-// data files (under any partition, using AlwaysTrue as the
-// conservative filter). Java refines this with the derived
-// eq-delete filter; a follow-up can do the same once RowDelta
-// carries the bound predicate.
+// conflicting data files. For unpartitioned tables the check is
+// conservative (AlwaysTrue — any concurrent append is a conflict).
+// For partitioned tables, an OR-of-equalities filter is built from
+// the eq-delete files' partition tuples and routed through
+// validateAddedDataFilesMatchingFilter, which performs per-spec
+// projection (spec-evolution safe), manifest-summary pruning, and
+// type-aware partition evaluation — so only concurrent data files
+// in the same partitions as the equality deletes are rejected.
//
// Fast appends alongside a RowDelta see no validators from RowDelta:
// data-only commits are as safe as a fastAppend.
@@ -209,7 +215,7 @@ func (rd *RowDelta) validate(cc *conflictContext) error {
// matching Java's behavior when the referenced-file column is
// unset.
var referenced []string
- var hasEqDeletes bool
+ var eqDeleteFiles []iceberg.DataFile
for _, f := range rd.delFiles {
switch f.ContentType() {
case iceberg.EntryContentPosDeletes:
@@ -217,7 +223,7 @@ func (rd *RowDelta) validate(cc *conflictContext) error {
referenced = append(referenced, *ref)
}
case iceberg.EntryContentEqDeletes:
- hasEqDeletes = true
+ eqDeleteFiles = append(eqDeleteFiles, f)
}
}
@@ -227,16 +233,28 @@ func (rd *RowDelta) validate(cc *conflictContext) error {
}
}
- if hasEqDeletes {
+ if len(eqDeleteFiles) > 0 {
level := readIsolationLevel(rd.txn.meta.props,
WriteDeleteIsolationLevelKey,
WriteDeleteIsolationLevelDefault)
- // Conservative: eq-deletes apply by predicate, and RowDelta
- // does not yet surface the bound predicate. AlwaysTrue is the
- // safest over-approximation and matches PR 2.3's contract on
- // validateNoConflictingDataFiles under SERIALIZABLE. Follow-up:
- // narrow with the actual eq-delete filter once it is carried
- // on the RowDelta.
- if err := validateNoConflictingDataFiles(cc,
iceberg.AlwaysTrue{}, level); err != nil {
+ // Route through the existing validateNoConflictingDataFiles
path,
+ // which calls validateAddedDataFilesMatchingFilter internally.
+ // For unpartitioned tables, use AlwaysTrue conservatively — an
+ // equality delete can affect any row. For partitioned tables,
+ // build an OR-of-equalities filter from the eq-delete files'
+ // partition tuples so that concurrent appends to different
+ // partitions are not falsely rejected.
+ currentSpec, specErr := rd.txn.meta.CurrentSpec()
+ if specErr != nil {
+ return fmt.Errorf("reading current partition spec: %w",
specErr)
+ }
+
+ var err error
+ if currentSpec == nil || currentSpec.NumFields() == 0 {
+ err = validateNoConflictingDataFiles(cc,
iceberg.AlwaysTrue{}, level)
+ } else {
+ err = validateNoConflictingDataFilesInPartitions(cc,
eqDeleteFiles, level)
+ }
+ if err != nil {
return err
}
}