mzzz-zzm opened a new issue, #978:
URL: https://github.com/apache/iceberg-go/issues/978

   ### Apache Iceberg version
   
   main (development)
   
   ### Please describe the bug 🐞
   
   `RowDelta.validate()` checks for conflicting concurrent data files by 
calling `validateNoConflictingDataFiles(cc, iceberg.AlwaysTrue{}, level)`. The 
`AlwaysTrue{}` filter matches **every** data file in every partition. As a 
result, a concurrent `FastAppend` to a completely different partition causes a 
`RowDelta` equality-delete commit to be rejected with 
`ErrConflictingDataFiles`, even though the two operations cannot possibly 
interfere with each other.
   
   In practice, any busy partitioned table with a streaming DELETE/UPDATE job 
(which uses `RowDelta` with equality-delete files) will almost never be able to 
commit under the default `SERIALIZABLE` isolation level, because some other 
writer is always appending to *some* partition.
   
   ## Affected component
   
   `table/row_delta.go` — `RowDelta.validate()`
   
   ## Steps to reproduce
   
   The test is self-contained. Save the file below as 
`table/bug_repro_rowdelta_partition_test.go` in an unmodified checkout of 
`main` and run:
   
   ```
   go test ./table/ -run TestBugRepro_RowDeltaFalseConflictDifferentPartition -v
   ```
   
   Expected output on unfixed upstream:
   
   ```
   --- FAIL: TestBugRepro_RowDeltaFalseConflictDifferentPartition
       bug_repro_rowdelta_partition_test.go:232:
           Error:   Received unexpected error:
                    commit failed, refresh and try again: concurrent data files 
added:
                    snapshot ... added data file .../worker-b-cold.parquet 
matching filter AlwaysTrue()
           Messages: serializable isolation must allow an eq-delete when the 
only
                     concurrent data file is in a completely different 
partition (AlwaysTrue bug)
   ```
   
   The test sets up a **baseline** commit first (so Writer A starts from a 
non-empty base), which isolates this bug from the separate empty-base
   conflict-bypass bug. Worker B's data is in partition `category="cold"`;
   Writer A's equality-delete targets `category="hot"`. The two partitions are 
completely disjoint.
   
   Full source of `table/bug_repro_rowdelta_partition_test.go`:
   
   ```go
   package table_test
   
   import (
        "context"
        "fmt"
        "path/filepath"
        "sync"
        "testing"
   
        "github.com/apache/iceberg-go"
        iceio "github.com/apache/iceberg-go/io"
        "github.com/apache/iceberg-go/table"
        "github.com/stretchr/testify/require"
   )
   
   type rowDeltaRepro3Catalog struct {
        mu       sync.Mutex
        current  table.Metadata
        location string
   }
   
   func newRowDeltaRepro3Catalog(meta table.Metadata, location string) 
*rowDeltaRepro3Catalog {
        return &rowDeltaRepro3Catalog{current: meta, location: location}
   }
   
   func (c *rowDeltaRepro3Catalog) LoadTable(_ context.Context, ident 
table.Identifier) (*table.Table, error) {
        c.mu.Lock()
        meta := c.current
        c.mu.Unlock()
        return table.New(
                ident, meta, c.location+"/metadata/v1.metadata.json",
                func(_ context.Context) (iceio.IO, error) { return 
iceio.LocalFS{}, nil },
                c,
        ), nil
   }
   
   func (c *rowDeltaRepro3Catalog) CommitTable(
        _ context.Context,
        ident table.Identifier,
        reqs []table.Requirement,
        updates []table.Update,
   ) (table.Metadata, string, error) {
        c.mu.Lock()
        defer c.mu.Unlock()
        for _, req := range reqs {
                if err := req.Validate(c.current); err != nil {
                        return nil, "", fmt.Errorf("%w: CAS check failed: %v", 
table.ErrCommitFailed, err)
                }
        }
        newMeta, err := table.UpdateTableMetadata(c.current, updates, "")
        if err != nil {
                return nil, "", err
        }
        c.current = newMeta
        return newMeta, c.location + "/metadata/committed.metadata.json", nil
   }
   
   func makeRepro3DataFile(t *testing.T, spec *iceberg.PartitionSpec, path, 
category string) iceberg.DataFile {
        t.Helper()
        var partition map[int]any
        if category != "" {
                partition = map[int]any{1000: category}
        }
        b, err := iceberg.NewDataFileBuilder(
                *spec, iceberg.EntryContentData, path,
                iceberg.ParquetFile, partition, nil, nil, 100, 1024,
        )
        require.NoError(t, err)
        return b.Build()
   }
   
   func makeRepro3EqDeleteFile(t *testing.T, spec *iceberg.PartitionSpec, path, 
category string) iceberg.DataFile {
        t.Helper()
        var partition map[int]any
        if category != "" {
                partition = map[int]any{1000: category}
        }
        b, err := iceberg.NewDataFileBuilder(
                *spec, iceberg.EntryContentEqDeletes, path,
                iceberg.ParquetFile, partition, nil, nil, 5, 512,
        )
        require.NoError(t, err)
        return b.EqualityFieldIDs([]int{3}).Build()
   }
   
   func TestBugRepro_RowDeltaFalseConflictDifferentPartition(t *testing.T) {
        location := filepath.ToSlash(t.TempDir())
        ctx := context.Background()
   
        schema := iceberg.NewSchema(0,
                iceberg.NestedField{ID: 1, Name: "category", Type: 
iceberg.PrimitiveTypes.String, Required: false},
                iceberg.NestedField{ID: 2, Name: "value", Type: 
iceberg.PrimitiveTypes.Int64, Required: false},
                iceberg.NestedField{ID: 3, Name: "event_id", Type: 
iceberg.PrimitiveTypes.Int64, Required: false},
        )
        spec := iceberg.NewPartitionSpec(
                iceberg.PartitionField{
                        SourceIDs: []int{1},
                        FieldID:   1000,
                        Name:      "category",
                        Transform: iceberg.IdentityTransform{},
                },
        )
        props := iceberg.Properties{
                table.PropertyFormatVersion:        "2",
                table.CommitNumRetriesKey:          "1",
                table.CommitMinRetryWaitMsKey:      "0",
                table.CommitMaxRetryWaitMsKey:      "0",
                table.CommitTotalRetryTimeoutMsKey: "60000",
        }
   
        metaEmpty, err := table.NewMetadata(schema, &spec, 
table.UnsortedSortOrder, location, props)
        require.NoError(t, err)
   
        cat := newRowDeltaRepro3Catalog(metaEmpty, location)
        fsF := func(_ context.Context) (iceio.IO, error) { return 
iceio.LocalFS{}, nil }
   
        newTbl := func(base table.Metadata) *table.Table {
                return table.New([]string{"default", "repro3"}, base,
                        location+"/metadata/v1.metadata.json", fsF, cat)
        }
   
        // Step 1: Establish baseline snapshot S0 so Writer A starts from a
        // non-empty base (isolates AlwaysTrue bug from the empty-base bug).
        baseline := makeRepro3DataFile(t, &spec, 
location+"/data/baseline.parquet", "warm")
        txSetup := newTbl(metaEmpty).NewTransaction()
        require.NoError(t, txSetup.AddDataFiles(ctx, 
[]iceberg.DataFile{baseline}, nil))
        _, err = txSetup.Commit(ctx)
        require.NoError(t, err, "baseline commit must succeed")
   
        cat.mu.Lock()
        metaS0 := cat.current
        cat.mu.Unlock()
   
        // Step 2: Worker B appends a data file in partition category="cold".
        workerBTbl, err := cat.LoadTable(ctx, []string{"default", "repro3"})
        require.NoError(t, err)
        dfCold := makeRepro3DataFile(t, &spec, 
location+"/data/worker-b-cold.parquet", "cold")
        txB := workerBTbl.NewTransaction()
        require.NoError(t, txB.AddDataFiles(ctx, []iceberg.DataFile{dfCold}, 
nil))
        _, err = txB.Commit(ctx)
        require.NoError(t, err, "Worker B must commit successfully")
   
        // Step 3: Writer A starts from S0, adds equality-delete in 
category="hot".
        writerATbl := newTbl(metaS0)
        eqDelHot := makeRepro3EqDeleteFile(t, &spec, 
location+"/data/writer-a-eq-del-hot.parquet", "hot")
        txA := writerATbl.NewTransaction()
        rd := txA.NewRowDelta(nil)
        rd.AddDeletes(eqDelHot)
        require.NoError(t, rd.Commit(ctx))
   
        // Step 4: Writer A commits.
        // Attempt 1: CAS fails (S0 vs Worker B's HEAD) → ErrCommitFailed.
        // Retry: newConflictContext returns [Worker_B_snapshot] as concurrent.
        //        RowDelta.validate calls 
validateNoConflictingDataFiles(AlwaysTrue{}).
        //        AlwaysTrue matches Worker B's "cold" file → 
ErrConflictingDataFiles.
        // Expected: no error ("cold" ≠ "hot", partitions are disjoint).
        _, err = txA.Commit(ctx)
        require.NoError(t, err,
                "serializable isolation must allow an eq-delete when the only 
concurrent "+
                        "data file is in a completely different partition 
(AlwaysTrue bug)")
   }
   ```
   
   ## Root cause
   
   The relevant section of `table/row_delta.go` on `main`:
   
   ```go
   // Comment in code:
   // "does not yet surface the bound predicate. AlwaysTrue is the 
   //  conservative fallback [...] Follow-up:"
   if err := validateNoConflictingDataFiles(cc, iceberg.AlwaysTrue{}, level); 
err != nil {
       return err
   }
   ```
   
   `validateNoConflictingDataFiles` iterates every ADDED entry in every 
concurrent snapshot and accepts any data file whose partition satisfies the 
supplied filter.
   Because the filter is `iceberg.AlwaysTrue{}`, every data file in every 
partition satisfies it — turning any concurrent append anywhere in the table 
into a reported conflict for the `RowDelta` committer.
   
   The Java reference implementation 
(`MergingSnapshotProducer.validateNoNewDataFiles`) derives the conflict filter 
from the equality-delete predicate itself, so only rows that could match the 
delete are checked. The Go implementation had not yet threaded that predicate 
through and fell back to `AlwaysTrue` as a temporary conservative measure. The 
result is that `SERIALIZABLE` `RowDelta` commits are effectively impossible on 
any active partitioned table — the default isolation level is unusable.
   
   ## Expected behaviour
   
   The conflict filter should be scoped to the partition(s) covered by the 
equality-delete files in the `RowDelta`. A concurrent append to a completely 
different partition cannot be affected by the equality deletes and must not 
block the commit. Only concurrent data files whose partition intersects with 
the equality-delete partition(s) should be flagged as a conflict.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to