mzzz-zzm opened a new issue, #978:
URL: https://github.com/apache/iceberg-go/issues/978
### Apache Iceberg version
main (development)
### Please describe the bug 🐞
`RowDelta.validate()` checks for conflicting concurrent data files by
calling `validateNoConflictingDataFiles(cc, iceberg.AlwaysTrue{}, level)`. The
`AlwaysTrue{}` filter matches **every** data file in every partition. As a
result, a concurrent `FastAppend` to a completely different partition causes a
`RowDelta` equality-delete commit to be rejected with
`ErrConflictingDataFiles`, even though the two operations cannot possibly
interfere with each other.
In practice, any busy partitioned table with a streaming DELETE/UPDATE job
(which uses `RowDelta` with equality-delete files) will almost never be able to
commit under the default `SERIALIZABLE` isolation level, because some other
writer is always appending to *some* partition.
## Affected component
`table/row_delta.go` — `RowDelta.validate()`
## Steps to reproduce
The test is self-contained. Save the file below as
`table/bug_repro_rowdelta_partition_test.go` in an unmodified checkout of
`main` and run:
```
go test ./table/ -run TestBugRepro_RowDeltaFalseConflictDifferentPartition -v
```
Expected output on unfixed upstream:
```
--- FAIL: TestBugRepro_RowDeltaFalseConflictDifferentPartition
bug_repro_rowdelta_partition_test.go:232:
Error: Received unexpected error:
commit failed, refresh and try again: concurrent data files
added:
snapshot ... added data file .../worker-b-cold.parquet
matching filter AlwaysTrue()
Messages: serializable isolation must allow an eq-delete when the
only
concurrent data file is in a completely different
partition (AlwaysTrue bug)
```
The test sets up a **baseline** commit first (so Writer A starts from a
non-empty base), which isolates this bug from the separate empty-base
conflict-bypass bug. Worker B's data is in partition `category="cold"`;
Writer A's equality-delete targets `category="hot"`. The two partitions are
completely disjoint.
Full source of `table/bug_repro_rowdelta_partition_test.go`:
```go
package table_test
import (
"context"
"fmt"
"path/filepath"
"sync"
"testing"
"github.com/apache/iceberg-go"
iceio "github.com/apache/iceberg-go/io"
"github.com/apache/iceberg-go/table"
"github.com/stretchr/testify/require"
)
type rowDeltaRepro3Catalog struct {
mu sync.Mutex
current table.Metadata
location string
}
func newRowDeltaRepro3Catalog(meta table.Metadata, location string)
*rowDeltaRepro3Catalog {
return &rowDeltaRepro3Catalog{current: meta, location: location}
}
func (c *rowDeltaRepro3Catalog) LoadTable(_ context.Context, ident
table.Identifier) (*table.Table, error) {
c.mu.Lock()
meta := c.current
c.mu.Unlock()
return table.New(
ident, meta, c.location+"/metadata/v1.metadata.json",
func(_ context.Context) (iceio.IO, error) { return
iceio.LocalFS{}, nil },
c,
), nil
}
func (c *rowDeltaRepro3Catalog) CommitTable(
_ context.Context,
ident table.Identifier,
reqs []table.Requirement,
updates []table.Update,
) (table.Metadata, string, error) {
c.mu.Lock()
defer c.mu.Unlock()
for _, req := range reqs {
if err := req.Validate(c.current); err != nil {
return nil, "", fmt.Errorf("%w: CAS check failed: %v",
table.ErrCommitFailed, err)
}
}
newMeta, err := table.UpdateTableMetadata(c.current, updates, "")
if err != nil {
return nil, "", err
}
c.current = newMeta
return newMeta, c.location + "/metadata/committed.metadata.json", nil
}
func makeRepro3DataFile(t *testing.T, spec *iceberg.PartitionSpec, path,
category string) iceberg.DataFile {
t.Helper()
var partition map[int]any
if category != "" {
partition = map[int]any{1000: category}
}
b, err := iceberg.NewDataFileBuilder(
*spec, iceberg.EntryContentData, path,
iceberg.ParquetFile, partition, nil, nil, 100, 1024,
)
require.NoError(t, err)
return b.Build()
}
func makeRepro3EqDeleteFile(t *testing.T, spec *iceberg.PartitionSpec, path,
category string) iceberg.DataFile {
t.Helper()
var partition map[int]any
if category != "" {
partition = map[int]any{1000: category}
}
b, err := iceberg.NewDataFileBuilder(
*spec, iceberg.EntryContentEqDeletes, path,
iceberg.ParquetFile, partition, nil, nil, 5, 512,
)
require.NoError(t, err)
return b.EqualityFieldIDs([]int{3}).Build()
}
func TestBugRepro_RowDeltaFalseConflictDifferentPartition(t *testing.T) {
location := filepath.ToSlash(t.TempDir())
ctx := context.Background()
schema := iceberg.NewSchema(0,
iceberg.NestedField{ID: 1, Name: "category", Type:
iceberg.PrimitiveTypes.String, Required: false},
iceberg.NestedField{ID: 2, Name: "value", Type:
iceberg.PrimitiveTypes.Int64, Required: false},
iceberg.NestedField{ID: 3, Name: "event_id", Type:
iceberg.PrimitiveTypes.Int64, Required: false},
)
spec := iceberg.NewPartitionSpec(
iceberg.PartitionField{
SourceIDs: []int{1},
FieldID: 1000,
Name: "category",
Transform: iceberg.IdentityTransform{},
},
)
props := iceberg.Properties{
table.PropertyFormatVersion: "2",
table.CommitNumRetriesKey: "1",
table.CommitMinRetryWaitMsKey: "0",
table.CommitMaxRetryWaitMsKey: "0",
table.CommitTotalRetryTimeoutMsKey: "60000",
}
metaEmpty, err := table.NewMetadata(schema, &spec,
table.UnsortedSortOrder, location, props)
require.NoError(t, err)
cat := newRowDeltaRepro3Catalog(metaEmpty, location)
fsF := func(_ context.Context) (iceio.IO, error) { return
iceio.LocalFS{}, nil }
newTbl := func(base table.Metadata) *table.Table {
return table.New([]string{"default", "repro3"}, base,
location+"/metadata/v1.metadata.json", fsF, cat)
}
// Step 1: Establish baseline snapshot S0 so Writer A starts from a
// non-empty base (isolates AlwaysTrue bug from the empty-base bug).
baseline := makeRepro3DataFile(t, &spec,
location+"/data/baseline.parquet", "warm")
txSetup := newTbl(metaEmpty).NewTransaction()
require.NoError(t, txSetup.AddDataFiles(ctx,
[]iceberg.DataFile{baseline}, nil))
_, err = txSetup.Commit(ctx)
require.NoError(t, err, "baseline commit must succeed")
cat.mu.Lock()
metaS0 := cat.current
cat.mu.Unlock()
// Step 2: Worker B appends a data file in partition category="cold".
workerBTbl, err := cat.LoadTable(ctx, []string{"default", "repro3"})
require.NoError(t, err)
dfCold := makeRepro3DataFile(t, &spec,
location+"/data/worker-b-cold.parquet", "cold")
txB := workerBTbl.NewTransaction()
require.NoError(t, txB.AddDataFiles(ctx, []iceberg.DataFile{dfCold},
nil))
_, err = txB.Commit(ctx)
require.NoError(t, err, "Worker B must commit successfully")
// Step 3: Writer A starts from S0, adds equality-delete in
category="hot".
writerATbl := newTbl(metaS0)
eqDelHot := makeRepro3EqDeleteFile(t, &spec,
location+"/data/writer-a-eq-del-hot.parquet", "hot")
txA := writerATbl.NewTransaction()
rd := txA.NewRowDelta(nil)
rd.AddDeletes(eqDelHot)
require.NoError(t, rd.Commit(ctx))
// Step 4: Writer A commits.
// Attempt 1: CAS fails (S0 vs Worker B's HEAD) → ErrCommitFailed.
// Retry: newConflictContext returns [Worker_B_snapshot] as concurrent.
// RowDelta.validate calls
validateNoConflictingDataFiles(AlwaysTrue{}).
// AlwaysTrue matches Worker B's "cold" file →
ErrConflictingDataFiles.
// Expected: no error ("cold" ≠ "hot", partitions are disjoint).
_, err = txA.Commit(ctx)
require.NoError(t, err,
"serializable isolation must allow an eq-delete when the only
concurrent "+
"data file is in a completely different partition
(AlwaysTrue bug)")
}
```
## Root cause
The relevant section of `table/row_delta.go` on `main`:
```go
// Comment in code:
// "does not yet surface the bound predicate. AlwaysTrue is the
// conservative fallback [...] Follow-up:"
if err := validateNoConflictingDataFiles(cc, iceberg.AlwaysTrue{}, level);
err != nil {
return err
}
```
`validateNoConflictingDataFiles` iterates every ADDED entry in every
concurrent snapshot and accepts any data file whose partition satisfies the
supplied filter.
Because the filter is `iceberg.AlwaysTrue{}`, every data file in every
partition satisfies it — turning any concurrent append anywhere in the table
into a reported conflict for the `RowDelta` committer.
The Java reference implementation
(`MergingSnapshotProducer.validateNoNewDataFiles`) derives the conflict filter
from the equality-delete predicate itself, so only rows that could match the
delete are checked. The Go implementation had not yet threaded that predicate
through and fell back to `AlwaysTrue` as a temporary conservative measure. The
result is that `SERIALIZABLE` `RowDelta` commits are effectively impossible on
any active partitioned table — the default isolation level is unusable.
## Expected behaviour
The conflict filter should be scoped to the partition(s) covered by the
equality-delete files in the `RowDelta`. A concurrent append to a completely
different partition cannot be affected by the equality deletes and must not
block the commit. Only concurrent data files whose partition intersects with
the equality-delete partition(s) should be flagged as a conflict.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]